diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index f40eb12f6..06bb3502b 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -984,7 +984,7 @@ class DeltaSharingRestClient( val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last) if (lastLineAction.endStreamAction == null) { throw new IllegalStateException(s"Client sets " + - s"${DELTA_SHARING_END_STREAM_ACTION}=true in the " + + s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + s"header, server responded with the header set to true(${response.capabilities}, " + s"and ${response.lines.size} lines, and last line parsed as " + s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging) @@ -993,12 +993,12 @@ class DeltaSharingRestClient( s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging ) case Some(false) => - logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the " + + logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + s"header, but the server responded with the header set to false(" + s"${response.capabilities})," + getDsQueryIdForLogging ) case None => - logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the" + + logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" + s" header, but server didn't respond with the header(${response.capabilities}), " + s"for query($dsQueryId)." ) @@ -1009,7 +1009,7 @@ class DeltaSharingRestClient( private def getRespondedHeaders(capabilities: Option[String]): (String, Option[Boolean]) = { val capabilitiesMap = parseDeltaSharingCapabilities(capabilities) val includedEndStreamActionOpt = capabilitiesMap - .get(DELTA_SHARING_END_STREAM_ACTION) + .get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION) ( capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET), includedEndStreamActionOpt.map(_.toBoolean) @@ -1189,7 +1189,7 @@ class DeltaSharingRestClient( } if (includeEndStreamAction) { - capabilities = capabilities :+ s"$DELTA_SHARING_END_STREAM_ACTION=true" + capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" } val cap = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER) @@ -1214,7 +1214,7 @@ object DeltaSharingRestClient extends Logging { val RESPONSE_FORMAT = "responseformat" val READER_FEATURES = "readerfeatures" val DELTA_SHARING_CAPABILITIES_ASYNC_READ = "asyncquery" - val DELTA_SHARING_END_STREAM_ACTION = "endstreamaction" + val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" val RESPONSE_FORMAT_DELTA = "delta" val RESPONSE_FORMAT_PARQUET = "parquet" val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" diff --git a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala index 774afbfb1..610d0c957 100644 --- a/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala +++ b/server/src/main/scala/io/delta/sharing/server/DeltaSharingService.scala @@ -618,7 +618,7 @@ class DeltaSharingService(serverConfig: ServerConfig) { includeEndStreamAction: Boolean = false): HttpResponse = { var capabilities = Seq[String](s"${DELTA_SHARING_RESPONSE_FORMAT}=$responseFormat") if (includeEndStreamAction) { - capabilities = capabilities :+ s"$DELTA_SHARING_END_STREAM_ACTION=true" + capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true" } val dsCapHeader = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER) @@ -654,7 +654,7 @@ object DeltaSharingService { val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities" val DELTA_SHARING_RESPONSE_FORMAT = "responseformat" val DELTA_SHARING_CAPABILITIES_ASYNC_QUERY = "asyncquery" - val DELTA_SHARING_END_STREAM_ACTION = "endstreamaction" + val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction" val DELTA_SHARING_READER_FEATURES = "readerfeatures" val DELTA_SHARING_CAPABILITIES_DELIMITER = ";" @@ -777,7 +777,7 @@ object DeltaSharingService { } private def getEndStreamActionInHeader(headerCapabilities: Map[String, String]): Boolean = { - headerCapabilities.get(DELTA_SHARING_END_STREAM_ACTION).map(_.toBoolean).getOrElse(false) + headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean).getOrElse(false) } private[server] def getResponseFormatSet(headerCapabilities: Map[String, String]): Set[String] = { @@ -796,7 +796,7 @@ object DeltaSharingService { private[server] def getRequestEndStreamAction( headerCapabilities: Map[String, String]): Boolean = { - headerCapabilities.get(DELTA_SHARING_END_STREAM_ACTION).exists(_.toBoolean) + headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean) } def main(args: Array[String]): Unit = {