diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index abe78b13b..019cc7aa5 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -84,7 +84,7 @@ case class FlintSparkMaterializedView( aggregate.copy(child = watermark(timeCol, aggregate.child)) case relation: UnresolvedRelation if !relation.isStreaming => - relation.copy(isStreaming = true, options = optionsWithExtra(relation)) + relation.copy(isStreaming = true, options = optionsWithExtra(spark, relation)) } logicalPlanToDataFrame(spark, streamingPlan) } @@ -98,9 +98,12 @@ case class FlintSparkMaterializedView( EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } - private def optionsWithExtra(relation: UnresolvedRelation): CaseInsensitiveStringMap = { + private def optionsWithExtra( + spark: SparkSession, + relation: UnresolvedRelation): CaseInsensitiveStringMap = { val originalOptions = relation.options.asCaseSensitiveMap - val extraOptions = options.extraSourceOptions(relation.tableName).asJava + val tableName = qualifyTableName(spark, relation.tableName) + val extraOptions = options.extraSourceOptions(tableName).asJava new CaseInsensitiveStringMap((originalOptions ++ extraOptions).asJava) }