diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index 3c9b3695f..760bc498b 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -983,8 +983,8 @@ class DeltaSharingRestClient( ) case None => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" + - s" header, but server didn't respond with the header(${capabilities}), " + - s"for query($dsQueryId)." + s" header, but server didn't respond with the header(${capabilities})," + + getDsQueryIdForLogging ) } } diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 3d7939eb1..59523036d 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -67,8 +67,6 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { } integrationTest("Check headers") { - val httpRequest = new HttpGet("random_url") - def checkUserAgent(request: HttpRequestBase, containsStreaming: Boolean): Unit = { val h = request.getFirstHeader(HttpHeaders.USER_AGENT).getValue if (containsStreaming) { @@ -109,38 +107,39 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { (false, true), (false, false) ).foreach { case (forStreaming, endStreamActionEnabled) => - var client = new DeltaSharingRestClient( + val httpRequest = new HttpGet("random_url") + var request = new DeltaSharingRestClient( testProfileProvider, forStreaming = forStreaming, endStreamActionEnabled = endStreamActionEnabled, readerFeatures = "willBeIgnored") .prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled) - checkUserAgent(client, forStreaming) - checkDeltaSharingCapabilities(client, "parquet", "", endStreamActionEnabled) + checkUserAgent(request, forStreaming) + checkDeltaSharingCapabilities(request, "parquet", "", endStreamActionEnabled) val readerFeatures = "deletionVectors,columnMapping,timestampNTZ" - client = new DeltaSharingRestClient( + request = new DeltaSharingRestClient( testProfileProvider, forStreaming = forStreaming, endStreamActionEnabled = endStreamActionEnabled, responseFormat = RESPONSE_FORMAT_DELTA, readerFeatures = readerFeatures) .prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled) - checkUserAgent(client, forStreaming) + checkUserAgent(request, forStreaming) checkDeltaSharingCapabilities( - client, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled + request, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled ) - client = new DeltaSharingRestClient( + request = new DeltaSharingRestClient( testProfileProvider, forStreaming = forStreaming, endStreamActionEnabled = endStreamActionEnabled, responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET", readerFeatures = readerFeatures) .prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled) - checkUserAgent(client, forStreaming) + checkUserAgent(request, forStreaming) checkDeltaSharingCapabilities( - client, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled + request, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled ) } } diff --git a/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala b/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala index e4f93bb99..c9f791b14 100644 --- a/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/util/ConfUtilsSuite.scala @@ -83,6 +83,22 @@ class ConfUtilsSuite extends SparkFunSuite { }.getMessage.contains(TIMEOUT_CONF) } + test("includeEndStreamAction") { + assert(includeEndStreamAction(newConf()) == true) + assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false) + assert(includeEndStreamAction(newConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == true) + + assert(includeEndStreamAction(newSqlConf()) == true) + assert( + includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "false"))) == false + ) + intercept[IllegalArgumentException] { + assert( + includeEndStreamAction(newSqlConf(Map(INCLUDE_END_STREAM_ACTION_CONF -> "random"))) == false + ) + }.getMessage.contains(INCLUDE_END_STREAM_ACTION_CONF) + } + test("maxConnections") { assert(maxConnections(newConf()) == MAX_CONNECTION_DEFAULT) assert(maxConnections(newConf(Map(MAX_CONNECTION_CONF -> "100"))) == 100)