Skip to content

Commit

Permalink
Backport includeEndStreamAction in DeltasharingService to branch-1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 25, 2024
1 parent 1f28ffb commit 1832fd8
Show file tree
Hide file tree
Showing 14 changed files with 558 additions and 248 deletions.
254 changes: 192 additions & 62 deletions client/src/main/scala/io/delta/sharing/client/DeltaSharingClient.scala

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions client/src/main/scala/io/delta/sharing/client/util/ConfUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ object ConfUtils {
val MAX_RETRY_DURATION_CONF = "spark.delta.sharing.network.maxRetryDuration"
val MAX_RETRY_DURATION_DEFAULT_MILLIS = 10L * 60L* 1000L /* 10 mins */

val INCLUDE_END_STREAM_ACTION_CONF = "spark.delta.sharing.query.includeEndStreamAction"
val INCLUDE_END_STREAM_ACTION_DEFAULT = "false"

val TIMEOUT_CONF = "spark.delta.sharing.network.timeout"
val TIMEOUT_DEFAULT = "320s"

Expand Down Expand Up @@ -111,6 +114,14 @@ object ConfUtils {
maxDur
}

def includeEndStreamAction(conf: Configuration): Boolean = {
conf.getBoolean(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT.toBoolean)
}

def includeEndStreamAction(conf: SQLConf): Boolean = {
conf.getConfString(INCLUDE_END_STREAM_ACTION_CONF, INCLUDE_END_STREAM_ACTION_DEFAULT).toBoolean
}

def timeoutInSeconds(conf: Configuration): Int = {
val timeoutStr = conf.get(TIMEOUT_CONF, TIMEOUT_DEFAULT)
toTimeInSeconds(timeoutStr, TIMEOUT_CONF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import scala.util.control.NonFatal

import org.apache.spark.internal.Logging

import io.delta.sharing.spark.MissingEndStreamActionException

private[sharing] object RetryUtils extends Logging {

// Expose it for testing
Expand Down Expand Up @@ -70,6 +72,7 @@ private[sharing] object RetryUtils extends Logging {
} else {
false
}
case _: MissingEndStreamActionException => true
case _: java.net.SocketTimeoutException => true
// do not retry on ConnectionClosedException because it can be caused by invalid json returned
// from the delta sharing server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package io.delta.sharing.spark

import org.apache.spark.sql.types.StructType

class MissingEndStreamActionException(message: String) extends IllegalStateException(message)

object DeltaSharingErrors {
def nonExistentDeltaSharingTable(tableId: String): Throwable = {
new IllegalStateException(s"Delta sharing table ${tableId} doesn't exist. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.sharing.spark

// scalastyle:off import.ordering.noEmptyLine
import java.lang.ref.WeakReference
import java.util.UUID

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -100,6 +101,8 @@ case class DeltaSharingSource(
assert(deltaLog.client.getForStreaming,
"forStreaming must be true for client in DeltaSharingSource.")

private val sourceId = Some(UUID.randomUUID().toString().split('-').head)

// The snapshot that's used to construct the dataframe, constructed when source is initialized.
// Use latest snapshot instead of snapshot at startingVersion, to allow easy recovery from
// failures on schema incompatibility.
Expand Down Expand Up @@ -148,10 +151,12 @@ case class DeltaSharingSource(
val intervalSeconds = ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf)
)
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}.")
logInfo(s"Configured queryTableVersionIntervalSeconds:${intervalSeconds}," +
getTableInfoForLogging)
if (intervalSeconds < ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS) {
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($intervalSeconds) " +
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds.")
s"must not be less than ${ConfUtils.MINIMUM_TABLE_VERSION_INTERVAL_SECONDS} seconds,"
+ getTableInfoForLogging)
}
intervalSeconds * 1000
}
Expand All @@ -166,6 +171,13 @@ case class DeltaSharingSource(
TableRefreshResult(Map.empty[String, String], None, None)
}

private lazy val getTableInfoForLogging: String =
s" for table(id:$tableId, name:${deltaLog.table.toString}, source:$sourceId)"

private def getQueryIdForLogging: String = {
s", with queryId(${deltaLog.client.getQueryId})"
}

// Check the latest table version from the delta sharing server through the client.getTableVersion
// RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive
// rpcs to avoid traffic jam on the delta sharing server.
Expand All @@ -174,13 +186,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server.")
logInfo(s"Got table version $serverVersion from Delta Sharing Server," +
getTableInfoForLogging)
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
s"$serverVersion," + getTableInfoForLogging)
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion.")
s"$latestTableVersion, " + getTableInfoForLogging)
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -246,7 +259,7 @@ case class DeltaSharingSource(
s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" +
s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " +
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
sortedFetchedFiles = Seq.empty
} else {
return
Expand All @@ -265,7 +278,7 @@ case class DeltaSharingSource(
logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" +
s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
getTableInfoForLogging
)
}

Expand Down Expand Up @@ -332,7 +345,7 @@ case class DeltaSharingSource(
): Unit = {
synchronized {
logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" +
s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).")
s"size: ${newIdToUrl.size})," + getTableInfoForLogging + getQueryIdForLogging)
lastQueryTableTimestamp = queryTimestamp
minUrlExpirationTimestamp = newMinUrlExpiration
if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) {
Expand All @@ -351,7 +364,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.add.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.add.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.add.copy(url = newUrl)
},
Expand All @@ -362,7 +375,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.remove.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.remove.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.remove.copy(url = newUrl)
},
Expand All @@ -373,7 +386,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.cdc.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.cdc.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.cdc.copy(url = newUrl)
},
Expand All @@ -382,7 +395,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}).")
s"${sortedFetchedFiles.size})," + getTableInfoForLogging)
}
}

Expand All @@ -408,8 +421,8 @@ case class DeltaSharingSource(
isStartingVersion: Boolean,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery)," +
getTableInfoForLogging
)
resetGlobalTimestamp()
if (isStartingVersion) {
Expand Down Expand Up @@ -456,7 +469,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
s" delta sharing server," + getTableInfoForLogging + getQueryIdForLogging
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -510,11 +523,13 @@ case class DeltaSharingSource(

TableRefreshResult(idToUrl, minUrlExpiration, None)
}
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
val filteredAddFiles = validateCommitAndFilterAddFiles(tableFiles)
val allAddFiles = filteredAddFiles.groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"Fetched ${tableFiles.addFiles.size} files, filtered ${filteredAddFiles.size} " +
s"in ${allAddFiles.size} versions from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
s"delta sharing server," + getTableInfoForLogging + getQueryIdForLogging
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -555,7 +570,7 @@ case class DeltaSharingSource(
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString}).")
s"endingVersionForQuery($endingVersionForQuery)," + getTableInfoForLogging)
resetGlobalTimestamp()
val tableFiles = deltaLog.client.getCDFFiles(
deltaLog.table,
Expand Down Expand Up @@ -824,7 +839,7 @@ case class DeltaSharingSource(
case cdf: AddCDCFile => cdfFiles.append(cdf)
case add: AddFileForCDF => addFiles.append(add)
case remove: RemoveFile => removeFiles.append(remove)
case f => throw new IllegalStateException(s"Unexpected File:${f}")
case f => throw new IllegalStateException(s"Unexpected File:${f}," + getTableInfoForLogging)
}
}

Expand Down Expand Up @@ -1000,8 +1015,8 @@ case class DeltaSharingSource(
}

override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end)," +
getTableInfoForLogging)
val endOffset = DeltaSharingSourceOffset(tableId, end)

val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if (
Expand All @@ -1028,7 +1043,7 @@ case class DeltaSharingSource(
val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get)
if (startOffset == endOffset) {
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
getTableInfoForLogging)
previousOffset = endOffset
// This happens only if we recover from a failure and `MicroBatchExecution` tries to call
// us with the previous offsets. The returned DataFrame will be dropped immediately, so we
Expand Down Expand Up @@ -1128,7 +1143,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server.")
s"from Delta Sharing Server," + getTableInfoForLogging)
Some(version)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ class DeltaSharingRestClientDeltaSuite extends DeltaSharingIntegrationTest {
val httpRequest = new HttpGet("random_url")

val client = new DeltaSharingRestClient(testProfileProvider)
var h = client.prepareHeaders(httpRequest).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER)
var h = client.prepareHeaders(httpRequest, setIncludeEndStreamAction = false).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER)
// scalastyle:off caselocale
assert(h.getValue.toLowerCase().contains(s"responseformat=${DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET}"))

val deltaClient = new DeltaSharingRestClient(
testProfileProvider,
responseFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA
)
h = deltaClient.prepareHeaders(httpRequest).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER)
h = deltaClient.prepareHeaders(httpRequest, setIncludeEndStreamAction = false).getFirstHeader(DeltaSharingRestClient.DELTA_SHARING_CAPABILITIES_HEADER)
// scalastyle:off caselocale
assert(h.getValue.toLowerCase().contains(s"responseformat=${DeltaSharingRestClient.RESPONSE_FORMAT_DELTA}"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,36 +84,66 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
assert(h.contains(" java/"))
}

def checkDeltaSharingCapabilities(request: HttpRequestBase, expected: String): Unit = {
def getEndStreamActionHeader(endStreamActionEnabled: Boolean): String = {
if (endStreamActionEnabled) {
s";$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
}
}

def checkDeltaSharingCapabilities(
request: HttpRequestBase,
responseFormat: String,
readerFeatures: String,
endStreamActionEnabled: Boolean): Unit = {
val expected = s"${RESPONSE_FORMAT}=$responseFormat$readerFeatures" +
getEndStreamActionHeader(endStreamActionEnabled)
val h = request.getFirstHeader(DELTA_SHARING_CAPABILITIES_HEADER)
assert(h.getValue == expected)
}

var httpRequestBase = new DeltaSharingRestClient(
testProfileProvider, forStreaming = false, readerFeatures = "willBeIgnored").prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, false)
checkDeltaSharingCapabilities(httpRequestBase, "responseformat=parquet")
Seq(
(true, true),
(true, false),
(false, true),
(false, false)
).foreach { case (forStreaming, endStreamActionEnabled) =>
val httpRequest = new HttpGet("random_url")
var request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
readerFeatures = "willBeIgnored")
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(request, "parquet", "", endStreamActionEnabled)

val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta;readerfeatures=$readerFeatures"
)
val readerFeatures = "deletionVectors,columnMapping,timestampNTZ"
request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = RESPONSE_FORMAT_DELTA,
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(
request, "delta", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)

httpRequestBase = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = true,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures).prepareHeaders(httpRequest)
checkUserAgent(httpRequestBase, true)
checkDeltaSharingCapabilities(
httpRequestBase, s"responseformat=delta,parquet;readerfeatures=$readerFeatures"
)
request = new DeltaSharingRestClient(
testProfileProvider,
forStreaming = forStreaming,
endStreamActionEnabled = endStreamActionEnabled,
responseFormat = s"$RESPONSE_FORMAT_DELTA,$RESPONSE_FORMAT_PARQUET",
readerFeatures = readerFeatures)
.prepareHeaders(httpRequest, setIncludeEndStreamAction = endStreamActionEnabled)
checkUserAgent(request, forStreaming)
checkDeltaSharingCapabilities(
request, s"delta,parquet", s";$READER_FEATURES=$readerFeatures", endStreamActionEnabled
)
}
}

integrationTest("listAllTables") {
Expand Down Expand Up @@ -1007,7 +1037,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
false
)
}.getMessage
assert(errorMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
assert(errorMessage.contains("""400 Bad Request for query"""))
assert(errorMessage.contains("""{"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
assert(errorMessage.contains("table files missing"))
} finally {
client.close()
Expand Down
Loading

0 comments on commit 1832fd8

Please sign in to comment.