Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only Check includeEndStreamAction in response header when needed #591

Merged
merged 19 commits into from
Oct 9, 2024
Merged
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ case class ParsedDeltaSharingTablePath(
*
* @param version the table version of the shared table.
* @param respondedFormat the sharing format (parquet or delta), used to parse the lines.
* @param includedEndStreamAction whether the last line is required to be an EndStreamAction, parsed
* from the response header.
* @param includeEndStreamActionHeader whether the last line is required to be an EndStreamAction,
* parsed from the response header.
* @param lines all lines in the response.
* @param capabilities value of delta-sharing-capabilities in the response header
*/
case class ParsedDeltaSharingResponse(
version: Long,
respondedFormat: String,
includedEndStreamAction: Option[Boolean],
includeEndStreamActionHeader: Option[Boolean],
lines: Seq[String],
capabilities: Option[String])

Expand Down Expand Up @@ -336,7 +336,7 @@ class DeltaSharingRestClient(
val target = getTargetUrl(
s"/shares/$encodedShareName/schemas/$encodedSchemaName/tables/$encodedTableName/metadata" +
s"$encodedParams")
val response = getNDJson(target)
val response = getNDJson(target, requireVersion = true, checkEndStreamActionHeader = false)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: do we set includeendstreamaction for metadata queries?
If do, would it be a problem if some DS server returns endStreamAction for it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense, updated to not set includeendstreamaction for table version and metadata queries.


checkRespondedFormat(
response.respondedFormat,
Expand Down Expand Up @@ -494,7 +494,9 @@ class DeltaSharingRestClient(
val (version, respondedFormat, lines, _) = getFilesByPage(table, target, request)
(version, respondedFormat, lines)
} else {
val response = getNDJson(target, request)
val response = getNDJsonPost(
target = target, data = request, checkEndStreamActionHeader = true
)
val (filteredLines, _) = maybeExtractEndStreamAction(response.lines)
(response.version, response.respondedFormat, filteredLines)
}
Expand Down Expand Up @@ -548,7 +550,9 @@ class DeltaSharingRestClient(
val (version, respondedFormat, lines, queryIdOpt) = if (enableAsyncQuery) {
getNDJsonWithAsync(table, targetUrl, request)
} else {
val response = getNDJson(targetUrl, request)
val response = getNDJsonPost(
target = targetUrl, data = request, checkEndStreamActionHeader = true
)
(response.version, response.respondedFormat, response.lines, None)
}

Expand Down Expand Up @@ -602,7 +606,9 @@ class DeltaSharingRestClient(
expectedRespondedFormat = respondedFormat,
expectedProtocol = protocol,
expectedMetadata = metadata,
pageNumber = numPages
pageNumber = numPages,
// EndStreamAction is not supported for async queries yet.
checkEndStreamActionHeader = !enableAsyncQuery
)
allLines.appendAll(res._1)
endStreamAction = res._2
Expand Down Expand Up @@ -645,7 +651,7 @@ class DeltaSharingRestClient(
)
getCDFFilesByPage(target)
} else {
val response = getNDJson(target, requireVersion = false)
val response = getNDJson(target, requireVersion = false, checkEndStreamActionHeader = true)
val (filteredLines, _) = maybeExtractEndStreamAction(response.lines)
(response.version, response.respondedFormat, filteredLines)
}
Expand Down Expand Up @@ -700,7 +706,7 @@ class DeltaSharingRestClient(

// Fetch first page
var updatedUrl = s"$targetUrl&maxFiles=$maxFilesPerReq"
val response = getNDJson(updatedUrl, requireVersion = false)
val response = getNDJson(updatedUrl, requireVersion = false, checkEndStreamActionHeader = true)
var (filteredLines, endStreamAction) = maybeExtractEndStreamAction(response.lines)
if (endStreamAction.isEmpty) {
logWarning(
Expand Down Expand Up @@ -728,7 +734,8 @@ class DeltaSharingRestClient(
expectedRespondedFormat = response.respondedFormat,
expectedProtocol = protocol,
expectedMetadata = metadata,
pageNumber = numPages
pageNumber = numPages,
checkEndStreamActionHeader = true
)
allLines.appendAll(res._1)
endStreamAction = res._2
Expand Down Expand Up @@ -763,12 +770,20 @@ class DeltaSharingRestClient(
expectedRespondedFormat: String,
expectedProtocol: String,
expectedMetadata: String,
pageNumber: Int): (Seq[String], Option[EndStreamAction]) = {
pageNumber: Int,
checkEndStreamActionHeader: Boolean): (Seq[String], Option[EndStreamAction]) = {
val start = System.currentTimeMillis()
val response = if (requestBody.isDefined) {
getNDJson(targetUrl, requestBody.get)
getNDJsonPost(
target = targetUrl,
data = requestBody.get,
checkEndStreamActionHeader = checkEndStreamActionHeader
)
} else {
getNDJson(targetUrl, requireVersion = false)
getNDJson(
target = targetUrl,
requireVersion = false,
checkEndStreamActionHeader = checkEndStreamActionHeader)
}
logInfo(s"Took ${System.currentTimeMillis() - start} to fetch ${pageNumber}th page " +
s"of ${response.lines.size} lines," + getDsQueryIdForLogging)
Expand Down Expand Up @@ -833,9 +848,11 @@ class DeltaSharingRestClient(
}

private def getNDJson(
target: String, requireVersion: Boolean = true): ParsedDeltaSharingResponse = {
target: String,
requireVersion: Boolean,
checkEndStreamActionHeader: Boolean): ParsedDeltaSharingResponse = {
val (version, capabilities, lines) = getResponse(new HttpGet(target))
val (respondedFormat, includedEndStreamAction) = getRespondedHeaders(capabilities)
val (respondedFormat, includeEndStreamActionHeader) = getRespondedHeaders(capabilities)

val response = ParsedDeltaSharingResponse(
version = version.getOrElse {
Expand All @@ -847,11 +864,13 @@ class DeltaSharingRestClient(
}
},
respondedFormat = respondedFormat,
includedEndStreamAction = includedEndStreamAction,
includeEndStreamActionHeader = includeEndStreamActionHeader,
lines,
capabilities = capabilities
)
checkEndStreamAction(response)
if (checkEndStreamActionHeader) {
checkEndStreamAction(response)
}
response
}

Expand All @@ -876,7 +895,9 @@ class DeltaSharingRestClient(
maxFiles = maxFiles,
pageToken = pageToken)

val response = getNDJson(target, request)
val response = getNDJsonPost(
target = target, data = request, checkEndStreamActionHeader = false
)
(response.version, response.respondedFormat, response.lines)
}

Expand Down Expand Up @@ -915,7 +936,9 @@ class DeltaSharingRestClient(
target: String,
request: QueryTableRequest): (Long, String, Seq[String], Option[String]) = {
// Initial query to get NDJson data
val response = getNDJson(target, request)
val response = getNDJsonPost(
target = target, data = request, checkEndStreamActionHeader = false
)

// Check if the query is still pending
var (lines, queryIdOpt, queryPending) = checkQueryPending(response.lines)
Expand Down Expand Up @@ -953,13 +976,16 @@ class DeltaSharingRestClient(
(version, respondedFormat, lines, queryIdOpt)
}

private def getNDJson[T: Manifest](target: String, data: T): ParsedDeltaSharingResponse = {
private def getNDJsonPost[T: Manifest](
target: String,
data: T,
checkEndStreamActionHeader: Boolean): ParsedDeltaSharingResponse = {
val httpPost = new HttpPost(target)
val json = JsonUtils.toJson(data)
httpPost.setHeader("Content-type", "application/json")
httpPost.setEntity(new StringEntity(json, UTF_8))
val (version, capabilities, lines) = getResponse(httpPost)
val (respondedFormat, includedEndStreamAction) = getRespondedHeaders(capabilities)
val (respondedFormat, includeEndStreamActionHeader) = getRespondedHeaders(capabilities)

val response = ParsedDeltaSharingResponse(
version = version.getOrElse {
Expand All @@ -968,18 +994,20 @@ class DeltaSharingRestClient(
)
},
respondedFormat = respondedFormat,
includedEndStreamAction = includedEndStreamAction,
includeEndStreamActionHeader = includeEndStreamActionHeader,
lines,
capabilities = capabilities
)
checkEndStreamAction(response)
if (checkEndStreamActionHeader) {
checkEndStreamAction(response)
}
response
}

private def checkEndStreamAction(response: ParsedDeltaSharingResponse): Unit = {
if (includeEndStreamAction) {
// Only perform additional check when includeEndStreamAction = true
response.includedEndStreamAction match {
response.includeEndStreamActionHeader match {
case Some(true) =>
val lastLineAction = JsonUtils.fromJson[SingleAction](response.lines.last)
if (lastLineAction.endStreamAction == null) {
Expand All @@ -990,7 +1018,8 @@ class DeltaSharingRestClient(
s"${lastLineAction.unwrap.getClass()}," + getDsQueryIdForLogging)
}
logInfo(
s"Successfully verified endStreamAction in the response" + getDsQueryIdForLogging
s"Successfully verified includeEndStreamAction in the response header" +
getDsQueryIdForLogging
)
case Some(false) =>
logWarning(s"Client sets ${DELTA_SHARING_INCLUDE_END_STREAM_ACTION}=true in the " +
Expand All @@ -1008,11 +1037,11 @@ class DeltaSharingRestClient(

private def getRespondedHeaders(capabilities: Option[String]): (String, Option[Boolean]) = {
val capabilitiesMap = parseDeltaSharingCapabilities(capabilities)
val includedEndStreamActionOpt = capabilitiesMap
val includeEndStreamActionOpt = capabilitiesMap
.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION)
(
capabilitiesMap.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET),
includedEndStreamActionOpt.map(_.toBoolean)
includeEndStreamActionOpt.map(_.toBoolean)
)
}

Expand Down
Loading