Skip to content

Commit

Permalink
moved condition
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 8, 2024
1 parent e262f9c commit d42fa22
Showing 1 changed file with 27 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ class DeltaSharingRestClient(
lines,
capabilities = capabilities
)
if (setIncludeEndStreamAction) {
if (includeEndStreamAction && setIncludeEndStreamAction) {
checkEndStreamAction(response)
}
response
Expand Down Expand Up @@ -1007,40 +1007,38 @@ class DeltaSharingRestClient(
lines,
capabilities = capabilities
)
if (setIncludeEndStreamAction) {
if (includeEndStreamAction && setIncludeEndStreamAction) {
checkEndStreamAction(response)
}
response
}

private def checkEndStreamAction(response: ParsedDeltaSharingResponse): Unit = {
if (includeEndStreamAction) {
// Only perform additional check when includeEndStreamAction = true
response.includeEndStreamActionHeader match {
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
if (lastLineAction.endStreamAction == null) {
throw new IllegalStateException(s"Client sets " +
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified includeEndStreamAction in the response header" +
getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, but the server responded with the header set to false(" +
s"${response.capabilities})," + getDsQueryIdForLogging
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${response.capabilities}), " +
s"for query($dsQueryId)."
)
}
// Only perform additional check when includeEndStreamAction = true
response.includeEndStreamActionHeader match {
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
if (lastLineAction.endStreamAction == null) {
throw new IllegalStateException(s"Client sets " +
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified includeEndStreamAction in the response header" +
getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, but the server responded with the header set to false(" +
s"${response.capabilities})," + getDsQueryIdForLogging
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${response.capabilities}), " +
s"for query($dsQueryId)."
)
}
}

Expand Down

0 comments on commit d42fa22

Please sign in to comment.