From f742a655fa2cdb80ffaf78e51647f005522988de Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Tue, 24 Sep 2024 12:55:16 -0700 Subject: [PATCH] Add resultIndex to session manager extension (#689) Signed-off-by: Louis Chu --- .../src/main/scala/org/apache/spark/sql/FlintREPL.scala | 3 ++- .../scala/org/apache/spark/sql/QueryResultWriterImpl.scala | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 635a5226e..a0516a37a 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -983,7 +983,8 @@ object FlintREPL extends Logging with FlintJobExecutor { resultIndexOption: Option[String]): SessionManager = { instantiate( new SessionManagerImpl(spark, resultIndexOption), - spark.conf.get(FlintSparkConf.CUSTOM_SESSION_MANAGER.key, "")) + spark.conf.get(FlintSparkConf.CUSTOM_SESSION_MANAGER.key, ""), + resultIndexOption.getOrElse("")) } private def instantiateStatementExecutionManager( diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala index 238f8fa3d..23d7f42a1 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/QueryResultWriterImpl.scala @@ -9,11 +9,13 @@ import org.opensearch.flint.common.model.FlintStatement import org.apache.spark.internal.Logging import org.apache.spark.sql.FlintJob.writeDataFrameToOpensearch +import org.apache.spark.sql.flint.config.FlintSparkConf class QueryResultWriterImpl(context: Map[String, Any]) extends QueryResultWriter with Logging { private val resultIndex = context("resultIndex").asInstanceOf[String] - private val osClient = context("osClient").asInstanceOf[OSClient] + // Initialize OSClient with Flint options because custom session manager implementation should not have it in the context + private val osClient = new OSClient(FlintSparkConf().flintOptions()) override def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit = { writeDataFrameToOpensearch(dataFrame, resultIndex, osClient)