Skip to content

Commit

Permalink
Qualify table name in relation when fetching extra option
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 18, 2023
1 parent dae275b commit 5cd981a
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class FlintSparkMaterializedView(
aggregate.copy(child = watermark(timeCol, aggregate.child))

case relation: UnresolvedRelation if !relation.isStreaming =>
relation.copy(isStreaming = true, options = optionsWithExtra(relation))
relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation))
}
logicalPlanToDataFrame(spark, streamingPlan)
}
Expand All @@ -98,9 +98,12 @@ case class FlintSparkMaterializedView(
EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child)
}

private def optionsWithExtra(relation: UnresolvedRelation): CaseInsensitiveStringMap = {
private def optionsWithExtra(
spark: SparkSession,
relation: UnresolvedRelation): CaseInsensitiveStringMap = {
val originalOptions = relation.options.asCaseSensitiveMap
val extraOptions = options.extraSourceOptions(relation.tableName).asJava
val tableName = qualifyTableName(spark, relation.tableName)
val extraOptions = options.extraSourceOptions(tableName).asJava
new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava)
}

Expand Down

0 comments on commit 5cd981a

Please sign in to comment.