Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional logging in DeltaSharingClient and DeltaSharingSource #586

Merged
merged 2 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ package io.delta.sharing.client
import java.io.{BufferedReader, InputStreamReader}
import java.net.{URL, URLEncoder}
import java.nio.charset.StandardCharsets.UTF_8
import java.sql.Timestamp
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter.{ISO_DATE, ISO_DATE_TIME}
import java.util.UUID

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
Expand All @@ -46,6 +43,17 @@ import io.delta.sharing.client.util.{ConfUtils, JsonUtils, RetryUtils, Unexpecte

/** An interface to fetch Delta metadata from remote server. */
trait DeltaSharingClient {

protected var dsQueryId: Option[String] = None

def getQueryId: String = {
dsQueryId.getOrElse("dsQueryIdNotSet")
}

protected def getDsQueryIdForLogging: String = {
s" for query($dsQueryId)."
}

def listAllTables(): Seq[Table]

def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long
Expand Down Expand Up @@ -198,8 +206,6 @@ class DeltaSharingRestClient(
// Convert the responseFormat to a Seq to be used later.
private val responseFormatSet = responseFormat.split(",").toSet

private var dsQueryId: Option[String] = None

private lazy val client = {
val clientBuilder: HttpClientBuilder = if (sslTrustAll) {
val sslBuilder = new SSLContextBuilder()
Expand Down Expand Up @@ -299,7 +305,7 @@ class DeltaSharingRestClient(
val (version, _, _) = getResponse(new HttpGet(target), true, true)
version.getOrElse {
throw new IllegalStateException(s"Cannot find " +
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header")
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header," + getDsQueryIdForLogging)
}
}

Expand All @@ -311,10 +317,10 @@ class DeltaSharingRestClient(
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
s"responseFormat($responseFormat) for $rpc for table $table," + getDsQueryIdForLogging)
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat).")
s" != requestedFormat($responseFormat)," + getDsQueryIdForLogging)
}
}

Expand All @@ -338,8 +344,8 @@ class DeltaSharingRestClient(
table = s"${table.share}.${table.schema}.${table.name}"
)
if (response.lines.size != 2) {
throw new IllegalStateException(s"received more than two lines:${response.lines.size}, " +
s"for query($dsQueryId).")
throw new IllegalStateException(s"received more than two lines:${response.lines.size}," +
getDsQueryIdForLogging)
}

if (response.respondedFormat == RESPONSE_FORMAT_DELTA) {
Expand All @@ -365,7 +371,8 @@ class DeltaSharingRestClient(
if (protocol.minReaderVersion > DeltaSharingProfile.CURRENT) {
throw new IllegalArgumentException(s"The table requires a newer version" +
s" ${protocol.minReaderVersion} to read. But the current release supports version " +
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release.")
s"is ${DeltaSharingProfile.CURRENT} and below. Please upgrade to a newer release." +
getDsQueryIdForLogging)
}
}

Expand Down Expand Up @@ -440,7 +447,7 @@ class DeltaSharingRestClient(
if (action.file != null) {
files.append(action.file)
} else {
throw new IllegalStateException(s"Unexpected Line:${line}")
throw new IllegalStateException(s"Unexpected Line:${line}" + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -481,7 +488,8 @@ class DeltaSharingRestClient(
val (version, respondedFormat, lines) = if (queryTablePaginationEnabled) {
logInfo(
s"Making paginated queryTable from version $startingVersion requests for table " +
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq"
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq, " +
s"for query($dsQueryId)."
)
val (version, respondedFormat, lines, _) = getFilesByPage(table, target, request)
(version, respondedFormat, lines)
Expand Down Expand Up @@ -512,7 +520,8 @@ class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(
s"Unexpected Line:${line}" + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -546,7 +555,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}

Expand Down Expand Up @@ -599,18 +608,20 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
if (minUrlExpirationTimestamp.exists(_ <= System.currentTimeMillis())) {
throw new IllegalStateException("Unable to fetch all pages before minimum url expiration.")
throw new IllegalStateException(
"Unable to fetch all pages before minimum url expiration." + getDsQueryIdForLogging
)
}
}

// TODO: remove logging once changes are rolled out
logInfo(s"Took ${System.currentTimeMillis() - start} ms to query $numPages pages " +
s"of ${allLines.size} files")
s"of ${allLines.size} files," + getDsQueryIdForLogging)
(version, respondedFormat, allLines.toSeq, refreshToken)
}

Expand All @@ -629,7 +640,8 @@ class DeltaSharingRestClient(
// TODO: remove logging once changes are rolled out
logInfo(
s"Making paginated queryTableChanges requests for table " +
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq"
s"${table.share}.${table.schema}.${table.name} with maxFiles=$maxFilesPerReq, " +
s"for query($dsQueryId)."
)
getCDFFilesByPage(target)
} else {
Expand Down Expand Up @@ -663,7 +675,8 @@ class DeltaSharingRestClient(
case a: AddFileForCDF => addFiles.append(a)
case r: RemoveFile => removeFiles.append(r)
case m: Metadata => additionalMetadatas.append(m)
case _ => throw new IllegalStateException(s"Unexpected Line:${line}")
case _ => throw new IllegalStateException(
s"Unexpected Line:${line}," + getDsQueryIdForLogging)
}
}
DeltaTableFiles(
Expand Down Expand Up @@ -691,7 +704,7 @@ class DeltaSharingRestClient(
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
val protocol = filteredLines(0)
Expand Down Expand Up @@ -721,19 +734,21 @@ class DeltaSharingRestClient(
endStreamAction = res._2
if (endStreamAction.isEmpty) {
logWarning(
s"EndStreamAction is not returned in the response for paginated query($dsQueryId)."
s"EndStreamAction is not returned in the response" + getDsQueryIdForLogging
)
}
// Throw an error if the first page is expiring before we get all pages
if (minUrlExpirationTimestamp.exists(_ <= System.currentTimeMillis())) {
throw new IllegalStateException("Unable to fetch all pages before minimum url expiration.")
throw new IllegalStateException(
"Unable to fetch all pages before minimum url expiration," + getDsQueryIdForLogging
)
}
}

// TODO: remove logging once changes are rolled out
logInfo(
s"Took ${System.currentTimeMillis() - start} ms to query $numPages pages " +
s"of ${allLines.size} files"
s"of ${allLines.size} files," + getDsQueryIdForLogging
)
(response.version, response.respondedFormat, allLines.toSeq)
}
Expand All @@ -756,7 +771,7 @@ class DeltaSharingRestClient(
getNDJson(targetUrl, requireVersion = false)
}
logInfo(s"Took ${System.currentTimeMillis() - start} to fetch ${pageNumber}th page " +
s"of ${response.lines.size} lines.")
s"of ${response.lines.size} lines," + getDsQueryIdForLogging)

// Validate that version/format/protocol/metadata in the response don't change across pages
if (response.version != expectedVersion ||
Expand All @@ -768,7 +783,7 @@ class DeltaSharingRestClient(
|Received inconsistent version/format/protocol/metadata across pages.
|Expected: version $expectedVersion, $expectedRespondedFormat,
|$expectedProtocol, $expectedMetadata. Actual: version ${response.version},
|${response.respondedFormat}, ${response.lines}""".stripMargin
|${response.respondedFormat}, ${response.lines},$getDsQueryIdForLogging""".stripMargin
logError(s"Error while fetching next page files at url $targetUrl " +
s"with body(${JsonUtils.toJson(requestBody.orNull)}: $errorMsg)")
throw new IllegalStateException(errorMsg)
Expand Down Expand Up @@ -826,7 +841,7 @@ class DeltaSharingRestClient(
version = version.getOrElse {
if (requireVersion) {
throw new IllegalStateException(s"Cannot find " +
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header")
s"${RESPONSE_TABLE_VERSION_HEADER_KEY} in the header" + getDsQueryIdForLogging)
} else {
0L
}
Expand Down Expand Up @@ -948,7 +963,9 @@ class DeltaSharingRestClient(

val response = ParsedDeltaSharingResponse(
version = version.getOrElse {
throw new IllegalStateException("Cannot find Delta-Table-Version in the header")
throw new IllegalStateException(
"Cannot find Delta-Table-Version in the header" + getDsQueryIdForLogging
)
},
respondedFormat = respondedFormat,
includedEndStreamAction = includedEndStreamAction,
Expand All @@ -970,20 +987,20 @@ class DeltaSharingRestClient(
s"${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
s"header, server responded with the header set to true(${response.capabilities}, " +
s"and ${response.lines.size} lines, and last line parsed as " +
s"${lastLineAction.unwrap.getClass()}, for query($dsQueryId).")
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified endStreamAction in the response for query($dsQueryId)."
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the " +
s"header, but the server responded with the header set to false(" +
s"${response.capabilities}), for query($dsQueryId)."
s"${response.capabilities})," + getDsQueryIdForLogging
)
case None =>
logWarning(s"Client sets ${DELTA_SHARING_END_STREAM_ACTION}=true in the" +
s" header, but server didn't respond with the header(${response.capabilities}) " +
s"for query($dsQueryId)."
s" header, but server didn't respond with the header(${response.capabilities}), " +
s"for query($dsQueryId)."
)
}
}
Expand Down Expand Up @@ -1015,7 +1032,7 @@ class DeltaSharingRestClient(
val (_, _, response) = getResponse(new HttpGet(target), false, true)
if (response.size != 1) {
throw new IllegalStateException(
"Unexpected response for target: " + target + ", response=" + response
s"Unexpected response for target:$target, response=$response" + getDsQueryIdForLogging
)
}
JsonUtils.fromJson[R](response(0))
Expand Down Expand Up @@ -1103,7 +1120,8 @@ class DeltaSharingRestClient(
}
} catch {
case e: org.apache.http.ConnectionClosedException =>
val error = s"Request to delta sharing server failed due to ${e}."
val error = s"Request to delta sharing server failed for query($dsQueryId) " +
s"due to ${e}."
logError(error)
lineBuffer += error
lineBuffer.toList
Expand All @@ -1123,7 +1141,8 @@ class DeltaSharingRestClient(
// Only show the last 100 lines in the error to keep it contained.
val responseToShow = lines.drop(lines.size - 100).mkString("\n")
throw new UnexpectedHttpStatus(
s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo",
s"HTTP request failed with status: $status $responseToShow. $additionalErrorInfo"
+ getDsQueryIdForLogging,
statusCode)
}
(
Expand Down Expand Up @@ -1166,7 +1185,7 @@ class DeltaSharingRestClient(
}

if (enableAsyncQuery) {
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true"
capabilities = capabilities :+ s"$DELTA_SHARING_CAPABILITIES_ASYNC_READ=true"
}

if (includeEndStreamAction) {
Expand Down
Loading
Loading