Skip to content

Commit

Permalink
merge head
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 8, 2024
2 parents d42fa22 + 1502f9d commit e60043a
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.SparkSession
import io.delta.sharing.client.auth.{AuthConfig, AuthCredentialProviderFactory}
import io.delta.sharing.client.model._
import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, UnexpectedHttpStatus}
import io.delta.sharing.spark.MissingEndStreamActionException

/** An interface to fetch Delta metadata from remote server. */
trait DeltaSharingClient {
Expand Down Expand Up @@ -1019,15 +1020,14 @@ class DeltaSharingRestClient(
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
if (lastLineAction.endStreamAction == null) {
throw new IllegalStateException(s"Client sets " +
throw new MissingEndStreamActionException(s"Client sets " +
s"${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified includeEndStreamAction in the response header" +
getDsQueryIdForLogging
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

import io.delta.sharing.spark.MissingEndStreamActionException

private[sharing] object RetryUtils extends Logging {

// Expose it for testing
Expand Down Expand Up @@ -70,6 +72,7 @@ private[sharing] object RetryUtils extends Logging {
} else {
false
}
case _: MissingEndStreamActionException => true
case _: java.net.SocketTimeoutException => true
// do not retry on ConnectionClosedException because it can be caused by invalid json returned
// from the delta sharing server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.delta.sharing.spark

import org.apache.spark.sql.types.StructType

class MissingEndStreamActionException(message: String) extends IllegalStateException(message)

object DeltaSharingErrors {
def nonExistentDeltaSharingTable(tableId: String): Throwable = {
new IllegalStateException(s"Delta sharing table ${tableId} doesn't exist. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.SparkFunSuite

import io.delta.sharing.client.util.{RetryUtils, UnexpectedHttpStatus}
import io.delta.sharing.client.util.RetryUtils._
import io.delta.sharing.spark.MissingEndStreamActionException

class RetryUtilsSuite extends SparkFunSuite {
test("shouldRetry") {
Expand All @@ -35,6 +36,7 @@ class RetryUtilsSuite extends SparkFunSuite {
assert(shouldRetry(new IOException))
assert(shouldRetry(new java.net.SocketTimeoutException))
assert(!shouldRetry(new RuntimeException))
assert(shouldRetry(new MissingEndStreamActionException("missing")))
}

test("runWithExponentialBackoff") {
Expand Down

0 comments on commit e60043a

Please sign in to comment.