Skip to content

Commit

Permalink
Backport additional logging to 0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 11, 2024
1 parent a68fe09 commit 1106639
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ private[sharing] trait DeltaSharingClient {

protected var dsQueryId: Option[String] = None

def getQueryId: String = {
dsQueryId.getOrElse("dsQueryIdNotSet")
}

protected def getDsQueryIdForLogging: String = {
s" for query($dsQueryId)."
}
Expand Down Expand Up @@ -113,6 +117,9 @@ private[spark] class DeltaSharingRestClient(
forStreaming: Boolean = false,
endStreamActionEnabled: Boolean = true
) extends DeltaSharingClient with Logging {

logInfo(s"DeltaSharingRestClient with endStreamActionEnabled: $endStreamActionEnabled.")

import DeltaSharingRestClient._

@volatile private var created = false
Expand Down Expand Up @@ -214,7 +221,7 @@ private[spark] class DeltaSharingRestClient(
)
version.getOrElse {
throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " +
s"header")
"header," + getDsQueryIdForLogging)
}
}

Expand All @@ -231,7 +238,8 @@ private[spark] class DeltaSharingRestClient(
checkProtocol(protocol)
val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData
if (lines.size != 2) {
throw new IllegalStateException("received more than two lines")
throw new IllegalStateException(s"received more than two lines:${lines.size}," +
getDsQueryIdForLogging)
}
DeltaTableMetadata(version, protocol, metadata)
}
Expand All @@ -240,7 +248,8 @@ private[spark] class DeltaSharingRestClient(
if (protocol.minReaderVersion > DeltaSharingRestClient.CURRENT) {
throw new IllegalArgumentException(s"The table requires a newer version" +
s" ${protocol.minReaderVersion} to read. But the current release supports version " +
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release.")
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release." +
getDsQueryIdForLogging)
}
}

Expand Down Expand Up @@ -281,7 +290,7 @@ private[spark] class DeltaSharingRestClient(
}
}
if (includeRefreshToken && refreshTokenOpt.isEmpty) {
logWarning("includeRefreshToken=true but refresh token is not returned.")
logWarning("includeRefreshToken=true but refresh token is not returned " + getQueryIdString)
}
require(versionAsOf.isEmpty || versionAsOf.get == version)
val protocol = JsonUtils.fromJson[SingleAction](filteredLines(0)).protocol
Expand All @@ -293,7 +302,7 @@ private[spark] class DeltaSharingRestClient(
if (action.file != null) {
files.append(action.file)
} else {
throw new IllegalStateException(s"Unexpected Line:${line}")
throw new IllegalStateException(s"Unexpected Line:${line}" + getDsQueryIdForLogging)
}
}
DeltaTableFiles(version, protocol, metadata, files.toSeq, refreshToken = refreshTokenOpt)
Expand Down Expand Up @@ -337,7 +346,8 @@ private[spark] class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(s"Unexpected Line:${line}" +
getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -380,7 +390,8 @@ private[spark] class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(s"Unexpected Line:${line}" +
getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -428,7 +439,7 @@ private[spark] class DeltaSharingRestClient(
version.getOrElse {
if (requireVersion) {
throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " +
s"header")
s"header," + getDsQueryIdForLogging)
} else {
0L
}
Expand All @@ -448,7 +459,7 @@ private[spark] class DeltaSharingRestClient(
)
version.getOrElse {
throw new IllegalStateException(s"Cannot find $RESPONSE_TABLE_VERSION_HEADER_KEY in the " +
s"header")
s"header," + getDsQueryIdForLogging)
} -> lines
}

Expand All @@ -461,7 +472,7 @@ private[spark] class DeltaSharingRestClient(
)
if (response.size != 1) {
throw new IllegalStateException(
"Unexpected response for target: " + target + ", response=" + response
s"Unexpected response for target:$target, response=$response" + getDsQueryIdForLogging
)
}
JsonUtils.fromJson[R](response(0))
Expand Down Expand Up @@ -557,7 +568,8 @@ private[spark] class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed due to ${e}."
val error = s"Request to delta sharing server failed$getDsQueryIdForLogging" +
s" due to ${e}."
logError(error)
lineBuffer += error
lineBuffer.toList
Expand All @@ -577,7 +589,8 @@ private[spark] class DeltaSharingRestClient(
// Only show the last 100 lines in the error to keep it contained.
val responseToShow = lines.drop(lines.size - 100).mkString("\n")
throw new UnexpectedHttpStatus(
s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo",
s"HTTP request failed with status: $status" +
Seq(getDsQueryIdForLogging, additionalErrorInfo, responseToShow).mkString(" "),
statusCode)
}
if (setIncludeEndStreamAction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.delta.sharing.spark

// scalastyle:off import.ordering.noEmptyLine
import java.lang.ref.WeakReference
import java.util.UUID

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -111,6 +112,8 @@ case class DeltaSharingSource(
assert(deltaLog.client.getForStreaming,
"forStreaming must be true for client in DeltaSharingSource.")

private val sourceId = Some(UUID.randomUUID().toString().split('-').head)

// The snapshot that's used to construct the dataframe, constructed when source is initialized.
// Use latest snapshot instead of snapshot at startingVersion, to allow easy recovery from
// failures on schema incompatibility.
Expand Down Expand Up @@ -159,9 +162,11 @@ case class DeltaSharingSource(
val interval = 30000.max(
ConfUtils.streamingQueryTableVersionIntervalSeconds(spark.sessionState.conf) * 1000
)
logInfo(s"Configured queryTableVersionIntervalMilliSeconds:${interval}," +
getTableInfoForLogging)
if (interval < 30000) {
throw new IllegalArgumentException("QUERY_TABLE_VERSION_INTERVAL_MILLIS must not be less " +
"than 30 seconds.")
throw new IllegalArgumentException(s"QUERY_TABLE_VERSION_INTERVAL_MILLIS($interval) must " +
"not be less than 30 seconds." + getTableInfoForLogging)
}
interval
}
Expand All @@ -176,6 +181,13 @@ case class DeltaSharingSource(
TableRefreshResult(Map.empty[String, String], None, None)
}

private lazy val getTableInfoForLogging: String =
s" for table(id:$tableId, name:${deltaLog.table.toString}, source:$sourceId)"

private def getQueryIdForLogging: String = {
s", with queryId(${deltaLog.client.getQueryId})"
}

// Check the latest table version from the delta sharing server through the client.getTableVersion
// RPC. Adding a minimum interval of QUERY_TABLE_VERSION_INTERVAL_MILLIS between two consecutive
// rpcs to avoid traffic jam on the delta sharing server.
Expand All @@ -184,13 +196,14 @@ case class DeltaSharingSource(
if (lastGetVersionTimestamp == -1 ||
(currentTimeMillis - lastGetVersionTimestamp) >= QUERY_TABLE_VERSION_INTERVAL_MILLIS) {
val serverVersion = deltaLog.client.getTableVersion(deltaLog.table)
logInfo(s"Got table version $serverVersion from Delta Sharing Server.")
logInfo(s"Got table version $serverVersion from Delta Sharing Server," +
getTableInfoForLogging)
if (serverVersion < 0) {
throw new IllegalStateException(s"Delta Sharing Server returning negative table version:" +
s"$serverVersion.")
s"$serverVersion," + getTableInfoForLogging)
} else if (serverVersion < latestTableVersion) {
logWarning(s"Delta Sharing Server returning smaller table version:$serverVersion < " +
s"$latestTableVersion.")
s"$latestTableVersion," + getTableInfoForLogging)
}
latestTableVersion = serverVersion
lastGetVersionTimestamp = currentTimeMillis
Expand Down Expand Up @@ -255,8 +268,8 @@ case class DeltaSharingSource(
logWarning(s"The asked file(" +
s"$fromVersion, $fromIndex, $isStartingVersion) is not included in sortedFetchedFiles[" +
s"(${headFile.version}, ${headFile.index}, ${headFile.isSnapshot}) to " +
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})], " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
s"(${lastFile.version}, ${lastFile.index}, ${lastFile.isSnapshot})]," +
getTableInfoForLogging)
sortedFetchedFiles = Seq.empty
} else {
return
Expand All @@ -274,8 +287,7 @@ case class DeltaSharingSource(
if (endingVersionForQuery < currentLatestVersion) {
logInfo(s"Reducing ending version for delta sharing rpc from currentLatestVersion(" +
s"$currentLatestVersion) to endingVersionForQuery($endingVersionForQuery), fromVersion:" +
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
s"$fromVersion, maxVersionsPerRpc:$maxVersionsPerRpc," + getTableInfoForLogging
)
}

Expand Down Expand Up @@ -342,7 +354,7 @@ case class DeltaSharingSource(
): Unit = {
synchronized {
logInfo(s"Refreshing sortedFetchedFiles(size: ${sortedFetchedFiles.size}) with newIdToUrl(" +
s"size: ${newIdToUrl.size}), for table(id:$tableId, name:${deltaLog.table.toString}).")
s"size: ${newIdToUrl.size})," + getTableInfoForLogging + getQueryIdForLogging)
lastQueryTableTimestamp = queryTimestamp
minUrlExpirationTimestamp = newMinUrlExpiration
if (!CachedTableManager.INSTANCE.isValidUrlExpirationTime(minUrlExpirationTimestamp)) {
Expand All @@ -361,7 +373,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.add.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.add.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.add.copy(url = newUrl)
},
Expand All @@ -372,7 +384,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.remove.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.remove.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.remove.copy(url = newUrl)
},
Expand All @@ -383,7 +395,7 @@ case class DeltaSharingSource(
val newUrl = newIdToUrl.getOrElse(
indexedFile.cdc.id,
throw new IllegalStateException(s"cannot find url for id ${indexedFile.cdc.id} " +
s"when refreshing table ${deltaLog.path}")
s"when refreshing table ${deltaLog.path}," + getTableInfoForLogging)
)
indexedFile.cdc.copy(url = newUrl)
},
Expand All @@ -392,7 +404,7 @@ case class DeltaSharingSource(
)
}
logInfo(s"Refreshed ${numUrlsRefreshed} urls in sortedFetchedFiles(size: " +
s"${sortedFetchedFiles.size}).")
s"${sortedFetchedFiles.size})," + getTableInfoForLogging)
}
}

Expand All @@ -418,8 +430,8 @@ case class DeltaSharingSource(
isStartingVersion: Boolean,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})."
s"isStartingVersion($isStartingVersion), endingVersionForQuery($endingVersionForQuery)," +
getTableInfoForLogging
)
resetGlobalTimestamp()
if (isStartingVersion) {
Expand Down Expand Up @@ -466,7 +478,7 @@ case class DeltaSharingSource(
val numFiles = tableFiles.files.size
logInfo(
s"Fetched ${numFiles} files for table version ${tableFiles.version} from" +
" delta sharing server."
s" delta sharing server," + getTableInfoForLogging + getQueryIdForLogging
)
tableFiles.files.sortWith(fileActionCompareFunc).zipWithIndex.foreach {
case (file, index) if (index > fromIndex) =>
Expand Down Expand Up @@ -520,11 +532,13 @@ case class DeltaSharingSource(

TableRefreshResult(idToUrl, minUrlExpiration, None)
}
val allAddFiles = validateCommitAndFilterAddFiles(tableFiles).groupBy(a => a.version)
val filteredAddFiles = validateCommitAndFilterAddFiles(tableFiles)
val allAddFiles = filteredAddFiles.groupBy(a => a.version)
logInfo(
s"Fetched and filtered ${allAddFiles.size} files from startingVersion " +
s"Fetched ${tableFiles.addFiles.size} files, filtered ${filteredAddFiles.size} " +
s"files in ${allAddFiles.size} versions from startingVersion " +
s"${fromVersion} to endingVersion ${endingVersionForQuery} from " +
"delta sharing server."
s"delta sharing server," + getTableInfoForLogging + getQueryIdForLogging
)
for (v <- fromVersion to endingVersionForQuery) {
val vAddFiles = allAddFiles.getOrElse(v, ArrayBuffer[AddFileForCDF]())
Expand Down Expand Up @@ -564,8 +578,7 @@ case class DeltaSharingSource(
fromIndex: Long,
endingVersionForQuery: Long): Unit = {
logInfo(s"Fetching CDF files with fromVersion($fromVersion), fromIndex($fromIndex), " +
s"endingVersionForQuery($endingVersionForQuery), " +
s"for table(id:$tableId, name:${deltaLog.table.toString}).")
s"endingVersionForQuery($endingVersionForQuery)," + getTableInfoForLogging)
resetGlobalTimestamp()
val tableFiles = deltaLog.client.getCDFFiles(
deltaLog.table,
Expand Down Expand Up @@ -834,7 +847,7 @@ case class DeltaSharingSource(
case cdf: AddCDCFile => cdfFiles.append(cdf)
case add: AddFileForCDF => addFiles.append(add)
case remove: RemoveFile => removeFiles.append(remove)
case f => throw new IllegalStateException(s"Unexpected File:${f}")
case f => throw new IllegalStateException(s"Unexpected File:${f},$getTableInfoForLogging")
}
}

Expand Down Expand Up @@ -1010,8 +1023,8 @@ case class DeltaSharingSource(
}

override def getBatch(startOffsetOption: Option[Offset], end: Offset): DataFrame = {
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end), " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"getBatch with startOffsetOption($startOffsetOption) and end($end)," +
getTableInfoForLogging)
val endOffset = DeltaSharingSourceOffset(tableId, end)

val (startVersion, startIndex, isStartingVersion, startSourceVersion) = if (
Expand All @@ -1037,8 +1050,8 @@ case class DeltaSharingSource(
} else {
val startOffset = DeltaSharingSourceOffset(tableId, startOffsetOption.get)
if (startOffset == endOffset) {
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch, " +
s"for table(id:$tableId, name:${deltaLog.table.toString})")
logInfo(s"startOffset($startOffset) is the same as endOffset($endOffset) in getBatch," +
getTableInfoForLogging)
previousOffset = endOffset
// This happens only if we recover from a failure and `MicroBatchExecution` tries to call
// us with the previous offsets. The returned DataFrame will be dropped immediately, so we
Expand Down Expand Up @@ -1138,7 +1151,7 @@ case class DeltaSharingSource(
} else if (options.startingTimestamp.isDefined) {
val version = deltaLog.client.getTableVersion(deltaLog.table, options.startingTimestamp)
logInfo(s"Got table version $version for timestamp ${options.startingTimestamp} " +
s"from Delta Sharing Server.")
s"from Delta Sharing Server," + getTableInfoForLogging)
Some(version)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
false
)
}.getMessage
assert(errorMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
assert(errorMessage.contains("""400 Bad Request for query"""))
assert(errorMessage.contains("""{"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
assert(errorMessage.contains("table files missing"))
} finally {
client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ class DeltaSharingSuite extends QueryTest with SharedSparkSession with DeltaShar
.option("startingVersion", 0).load(tablePath)
checkAnswer(df, Nil)
}
assert (ex.getMessage.contains("""400 Bad Request {"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
assert(ex.getMessage.contains("""400 Bad Request"""))
assert(ex.getMessage.contains("""{"errorCode":"RESOURCE_DOES_NOT_EXIST""""))
}

integrationTest("azure support") {
Expand Down

0 comments on commit 1106639

Please sign in to comment.