Skip to content

Commit

Permalink
rename header from endStreamAction to includeEndStreamAction
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 5, 2024
1 parent 8a1d012 commit 1d1963d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,7 @@ class DeltaSharingRestClient(
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
if (lastLineAction.endStreamAction == null) {
throw new IllegalStateException(s"Client sets " +
s"${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
Expand All @@ -993,12 +993,12 @@ class DeltaSharingRestClient(
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, but the server responded with the header set to false(" +
s"${response.capabilities})," + getDsQueryIdForLogging
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the" +
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${response.capabilities}), " +
s"for query($dsQueryId)."
)
Expand All @@ -1009,7 +1009,7 @@ class DeltaSharingRestClient(
private def getRespondedHeaders(capabilities: Option[String]): (String, Option[Boolean]) = {
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
val includedEndStreamActionOpt = capabilitiesMap
.get(DELTA_SHARING_END_STREAM_ACTION)
.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION)
(
capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET),
includedEndStreamActionOpt.map(_.toBoolean)
Expand Down Expand Up @@ -1189,7 +1189,7 @@ class DeltaSharingRestClient(
}

if (includeEndStreamAction) {
capabilities = capabilities :+ s"$DELTA_SHARING_END_STREAM_ACTION=true"
capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
}

val cap = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER)
Expand All @@ -1214,7 +1214,7 @@ object DeltaSharingRestClient extends Logging {
val RESPONSE_FORMAT = "responseformat"
val READER_FEATURES = "readerfeatures"
val DELTA_SHARING_CAPABILITIES_ASYNC_READ = "asyncquery"
val DELTA_SHARING_END_STREAM_ACTION = "endstreamaction"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val RESPONSE_FORMAT_DELTA = "delta"
val RESPONSE_FORMAT_PARQUET = "parquet"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ class DeltaSharingService(serverConfig: ServerConfig) {
includeEndStreamAction: Boolean = false): HttpResponse = {
var capabilities = Seq[String](s"${DELTA_SHARING_RESPONSE_FORMAT}=$responseFormat")
if (includeEndStreamAction) {
capabilities = capabilities :+ s"$DELTA_SHARING_END_STREAM_ACTION=true"
capabilities = capabilities :+ s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
}
val dsCapHeader = capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER)

Expand Down Expand Up @@ -654,7 +654,7 @@ object DeltaSharingService {
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_RESPONSE_FORMAT = "responseformat"
val DELTA_SHARING_CAPABILITIES_ASYNC_QUERY = "asyncquery"
val DELTA_SHARING_END_STREAM_ACTION = "endstreamaction"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_READER_FEATURES = "readerfeatures"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"

Expand Down Expand Up @@ -777,7 +777,7 @@ object DeltaSharingService {
}

private def getEndStreamActionInHeader(headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_END_STREAM_ACTION).map(_.toBoolean).getOrElse(false)
headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean).getOrElse(false)
}

private[server] def getResponseFormatSet(headerCapabilities: Map[String, String]): Set[String] = {
Expand All @@ -796,7 +796,7 @@ object DeltaSharingService {

private[server] def getRequestEndStreamAction(
headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_END_STREAM_ACTION).exists(_.toBoolean)
headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean)
}

def main(args: Array[String]): Unit = {
Expand Down

0 comments on commit 1d1963d

Please sign in to comment.