diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala index c0613df0b..53be09f5a 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala @@ -98,15 +98,12 @@ case class ParsedDeltaSharingTablePath( * * @param version the table version of the shared table. * @param respondedFormat the sharing format (parquet or delta), used to parse the lines. - * @param includeEndStreamActionHeader whether the last line is required to be an EndStreamAction, - * parsed from the response header. * @param lines all lines in the response. * @param capabilities value of delta-sharing-capabilities in the response header */ case class ParsedDeltaSharingResponse( version: Long, respondedFormat: String, - includeEndStreamActionHeader: Option[Boolean], lines: Seq[String], capabilities: Option[String]) @@ -864,7 +861,6 @@ class DeltaSharingRestClient( val (version, capabilities, lines) = getResponse( new HttpGet(target), setIncludeEndStreamAction = setIncludeEndStreamAction ) - val (respondedFormat, includeEndStreamActionHeader) = getRespondedHeaders(capabilities) val response = ParsedDeltaSharingResponse( version = version.getOrElse { @@ -875,14 +871,10 @@ class DeltaSharingRestClient( 0L } }, - respondedFormat = respondedFormat, - includeEndStreamActionHeader = includeEndStreamActionHeader, + respondedFormat = getRespondedFormat(capabilities), lines, capabilities = capabilities ) - if (setIncludeEndStreamAction) { - checkEndStreamAction(response) - } response } @@ -999,7 +991,6 @@ class DeltaSharingRestClient( val (version, capabilities, lines) = getResponse( httpPost, setIncludeEndStreamAction = setIncludeEndStreamAction ) - val (respondedFormat, includeEndStreamActionHeader) = getRespondedHeaders(capabilities) val response = ParsedDeltaSharingResponse( version = version.getOrElse { @@ -1007,27 +998,23 @@ class DeltaSharingRestClient( "Cannot find Delta-Table-Version in the header" + getDsQueryIdForLogging ) }, - respondedFormat = respondedFormat, - includeEndStreamActionHeader = includeEndStreamActionHeader, + respondedFormat = getRespondedFormat(capabilities), lines, capabilities = capabilities ) - if (setIncludeEndStreamAction) { - checkEndStreamAction(response) - } response } - private def checkEndStreamAction(response: ParsedDeltaSharingResponse): Unit = { - // Only perform additional check when endStreamActionEnabled = true - response.includeEndStreamActionHeader match { + private def checkEndStreamAction(capabilities: Option[String], lines: Seq[String]): Unit = { + val includeEndStreamActionHeader = getRespondedIncludeEndStreamActionHeader(capabilities) + includeEndStreamActionHeader match { case Some(true) => - val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last) + val lastLineAction = JsonUtils.fromJson[SingleAction](lines.last) if (lastLineAction.endStreamAction == null) { throw new MissingEndStreamActionException(s"Client sets " + s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + - s"header, server responded with the header set to true(${response.capabilities}, " + - s"and ${response.lines.size} lines, and last line parsed as " + + s"header, server responded with the header set to true(${capabilities}, " + + s"and ${lines.size} lines, and last line parsed as " + s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging) } logInfo( @@ -1036,24 +1023,27 @@ class DeltaSharingRestClient( case Some(false) => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " + s"header, but the server responded with the header set to false(" + - s"${response.capabilities})," + getDsQueryIdForLogging + s"${capabilities})," + getDsQueryIdForLogging ) case None => logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the" + - s" header, but server didn't respond with the header(${response.capabilities}), " + + s" header, but server didn't respond with the header(${capabilities}), " + s"for query($dsQueryId)." ) } } - private def getRespondedHeaders(capabilities: Option[String]): (String, Option[Boolean]) = { + private def getRespondedFormat(capabilities: Option[String]): String = { val capabilitiesMap = parseDeltaSharingCapabilities(capabilities) - val includeEndStreamActionOpt = capabilitiesMap - .get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION) - ( - capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET), - includeEndStreamActionOpt.map(_.toBoolean) - ) + capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET) + } + + // includeEndStreamActionHeader indicates whether the last line is required to be an + // EndStreamAction, parsed from the response header. + private def getRespondedIncludeEndStreamActionHeader( + capabilities: Option[String]): Option[Boolean] = { + val capabilitiesMap = parseDeltaSharingCapabilities(capabilities) + capabilitiesMap.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).map(_.toBoolean) } private def parseDeltaSharingCapabilities(capabilities: Option[String]): Map[String, String] = { @@ -1185,22 +1175,26 @@ class DeltaSharingRestClient( var additionalErrorInfo = "" if (statusCode == HttpStatus.SC_UNAUTHORIZED && tokenExpired()) { additionalErrorInfo = s"It may be caused by an expired token as it has expired " + - s"at ${authCredentialProvider.getExpirationTime()}" + s"at ${authCredentialProvider.getExpirationTime()}." } // Only show the last 100 lines in the error to keep it contained. val responseToShow = lines.drop(lines.size - 100).mkString("\n") throw new UnexpectedHttpStatus( - s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo" - + getDsQueryIdForLogging, + s"HTTP request failed with status$getDsQueryIdForLogging: $status." + + s" $additionalErrorInfo $responseToShow", statusCode) } + val capabilities = Option( + response.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER) + ).map(_.getValue) + if (setIncludeEndStreamAction) { + checkEndStreamAction(capabilities, lines) + } ( Option( response.getFirstHeader(RESPONSE_TABLE_VERSION_HEADER_KEY) ).map(_.getValue.toLong), - Option( - response.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER) - ).map(_.getValue), + capabilities, lines ) } finally { diff --git a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala index 6ce6e17a4..bbf6fa7cb 100644 --- a/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala +++ b/client/src/main/scala/io/delta/sharing/client/util/RetryUtils.scala @@ -50,11 +50,11 @@ private[sharing] object RetryUtils extends Logging { e ) if (shouldRetry(e) && times <= numRetries && totalDuration <= maxDurationMillis) { - logWarning(s"Sleeping $sleepMs ms to retry") + logWarning(s"Sleeping $sleepMs ms to retry on error: ${e.getMessage}.") sleeper(sleepMs) sleepMs *= 2 } else { - logError(s"Not retrying delta sharing rpc on error: ${e.getMessage}", e) + logError(s"Not retrying delta sharing rpc on error: ${e.getMessage}.") throw e } }