Skip to content

Commit

Permalink
Backport endstreamaction to branc-0.7
Browse files Browse the repository at this point in the history
  • Loading branch information
linzhou-db committed Oct 10, 2024
1 parent 6ee58e7 commit fbd45ca
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,16 @@ class DeltaSharingService(serverConfig: ServerConfig) {
startingVersion = None,
endingVersion = None,
includeRefreshToken = false,
refreshToken = None
refreshToken = None,
includeEndStreamAction = false
)
streamingOutput(Some(v), actions)
}

@Post("/shares/{share}/schemas/{schema}/tables/{table}/query")
@ConsumesJson
def listFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand Down Expand Up @@ -343,6 +345,10 @@ class DeltaSharingService(serverConfig: ServerConfig) {
)
}
}
val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (version, actions) = deltaSharedTableLoader.loadTable(tableConfig).query(
includeFiles = true,
request.predicateHints,
Expand All @@ -353,7 +359,8 @@ class DeltaSharingService(serverConfig: ServerConfig) {
request.startingVersion,
request.endingVersion,
request.includeRefreshToken.getOrElse(false),
request.refreshToken
request.refreshToken,
includeEndStreamAction = includeEndStreamAction
)
if (version < tableConfig.startVersion) {
throw new DeltaSharingIllegalArgumentException(
Expand All @@ -362,12 +369,13 @@ class DeltaSharingService(serverConfig: ServerConfig) {
}
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(version), actions)
streamingOutput(Some(version), actions, includeEndStreamAction)
}

@Get("/shares/{share}/schemas/{schema}/tables/{table}/changes")
@ConsumesJson
def listCdfFiles(
req: HttpRequest,
@Param("share") share: String,
@Param("schema") schema: String,
@Param("table") table: String,
Expand All @@ -384,6 +392,10 @@ class DeltaSharingService(serverConfig: ServerConfig) {
s"$share.$schema.$table")
}

val capabilitiesMap = getDeltaSharingCapabilitiesMap(
req.headers().get(DELTA_SHARING_CAPABILITIES_HEADER)
)
val includeEndStreamAction = getRequestEndStreamAction(capabilitiesMap)
val (v, actions) = deltaSharedTableLoader.loadTable(tableConfig).queryCDF(
getCdfOptionsMap(
Option(startingVersion),
Expand All @@ -395,17 +407,27 @@ class DeltaSharingService(serverConfig: ServerConfig) {
)
logger.info(s"Took ${System.currentTimeMillis - start} ms to load the table cdf " +
s"and sign ${actions.length - 2} urls for table $share/$schema/$table")
streamingOutput(Some(v), actions)
streamingOutput(Some(v), actions, includeEndStreamAction)
}

private def streamingOutput(version: Option[Long], actions: Seq[SingleAction]): HttpResponse = {
private def streamingOutput(
version: Option[Long],
actions: Seq[SingleAction],
includeEndStreamAction: Boolean = false): HttpResponse = {
val capabilities = if (includeEndStreamAction) {
s"$DELTA_SHARING_INCLUDE_END_STREAM_ACTION=true"
} else {
""
}
val headers = if (version.isDefined) {
createHeadersBuilderForTableVersion(version.get)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
} else {
ResponseHeaders.builder(200)
.set(HttpHeaderNames.CONTENT_TYPE, DELTA_TABLE_METADATA_CONTENT_TYPE)
.set(DELTA_SHARING_CAPABILITIES_HEADER, capabilities)
.build()
}
ResponseConversionUtil.streamingFrom(
Expand All @@ -420,12 +442,27 @@ class DeltaSharingService(serverConfig: ServerConfig) {
},
ServiceRequestContext.current().blockingTaskExecutor())
}

private def getDeltaSharingCapabilitiesMap(headerString: String): Map[String, String] = {
if (headerString == null) {
return Map.empty[String, String]
}
headerString.toLowerCase().split(DELTA_SHARING_CAPABILITIES_DELIMITER)
.map(_.split("="))
.filter(_.size == 2)
.map { splits =>
(splits(0), splits(1))
}.toMap
}
}


object DeltaSharingService {
val DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"
val DELTA_TABLE_METADATA_CONTENT_TYPE = "application/x-ndjson; charset=utf-8"
val DELTA_SHARING_CAPABILITIES_HEADER = "delta-sharing-capabilities"
val DELTA_SHARING_INCLUDE_END_STREAM_ACTION = "includeendstreamaction"
val DELTA_SHARING_CAPABILITIES_DELIMITER = ";"

val SPARK_STRUCTURED_STREAMING = "SparkStructuredStreaming"

Expand Down Expand Up @@ -547,6 +584,11 @@ object DeltaSharingService {
endingTimestamp.map(DeltaDataSource.CDF_END_TIMESTAMP_KEY -> _)).toMap
}

private[server] def getRequestEndStreamAction(
headerCapabilities: Map[String, String]): Boolean = {
headerCapabilities.get(DELTA_SHARING_INCLUDE_END_STREAM_ACTION).exists(_.toBoolean)
}

def main(args: Array[String]): Unit = {
val ns = parser.parseArgsOrFail(args)
val serverConfigPath = ns.getString("config")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ class DeltaSharedTable(
snapshot.version
}

// scalastyle:off argcount
def query(
includeFiles: Boolean,
predicateHints: Seq[String],
Expand All @@ -195,7 +196,8 @@ class DeltaSharedTable(
startingVersion: Option[Long],
endingVersion: Option[Long],
includeRefreshToken: Boolean,
refreshToken: Option[String]
refreshToken: Option[String],
includeEndStreamAction: Boolean
): (Long, Seq[model.SingleAction]) = withClassLoader {
// TODO Support `limitHint`
if (Seq(version, timestamp, startingVersion).filter(_.isDefined).size >= 2) {
Expand Down Expand Up @@ -245,7 +247,7 @@ class DeltaSharedTable(
if (startingVersion.isDefined) {
// Only read changes up to snapshot.version, and ignore changes that are committed during
// queryDataChangeSinceStartVersion.
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion)
queryDataChangeSinceStartVersion(startingVersion.get, endingVersion, includeEndStreamAction)
} else if (includeFiles) {
val ts = if (isVersionQuery) {
val timestampsByVersion = DeltaSharingHistoryManager.getTimestampsByVersion(
Expand Down Expand Up @@ -305,6 +307,8 @@ class DeltaSharedTable(
)
)
Seq(model.EndStreamAction(refreshTokenStr).wrap)
} else if (includeEndStreamAction) {
Seq(model.EndStreamAction(null).wrap)
} else {
Nil
}
Expand All @@ -319,7 +323,8 @@ class DeltaSharedTable(

private def queryDataChangeSinceStartVersion(
startingVersion: Long,
endingVersion: Option[Long]
endingVersion: Option[Long],
includeEndStreamAction: Boolean
): Seq[model.SingleAction] = {
var latestVersion = tableVersion
if (startingVersion > latestVersion) {
Expand Down Expand Up @@ -388,12 +393,16 @@ class DeltaSharedTable(
case _ => ()
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
actions.toSeq
}

def queryCDF(
cdfOptions: Map[String, String],
includeHistoricalMetadata: Boolean = false
includeHistoricalMetadata: Boolean = false,
includeEndStreamAction: Boolean = false
): (Long, Seq[model.SingleAction]) = withClassLoader {
val actions = ListBuffer[model.SingleAction]()

Expand Down Expand Up @@ -495,6 +504,9 @@ class DeltaSharedTable(
actions.append(modelRemoveFile.wrap)
}
}
if (includeEndStreamAction) {
actions.append(model.EndStreamAction(null).wrap)
}
start -> actions.toSeq
}

Expand Down
Loading

0 comments on commit fbd45ca

Please sign in to comment.