Skip to content

Commit

Permalink
Add resultIndex to session manager extension (#689)
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Sep 24, 2024
1 parent 92121a3 commit f742a65
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
resultIndexOption: Option[String]): SessionManager = {
instantiate(
new SessionManagerImpl(spark, resultIndexOption),
spark.conf.get(FlintSparkConf.CUSTOM_SESSION_MANAGER.key, ""))
spark.conf.get(FlintSparkConf.CUSTOM_SESSION_MANAGER.key, ""),
resultIndexOption.getOrElse(""))
}

private def instantiateStatementExecutionManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import org.opensearch.flint.common.model.FlintStatement

import org.apache.spark.internal.Logging
import org.apache.spark.sql.FlintJob.writeDataFrameToOpensearch
import org.apache.spark.sql.flint.config.FlintSparkConf

class QueryResultWriterImpl(context: Map[String, Any]) extends QueryResultWriter with Logging {

private val resultIndex = context("resultIndex").asInstanceOf[String]
private val osClient = context("osClient").asInstanceOf[OSClient]
// Initialize OSClient with Flint options because custom session manager implementation should not have it in the context
private val osClient = new OSClient(FlintSparkConf().flintOptions())

override def writeDataFrame(dataFrame: DataFrame, flintStatement: FlintStatement): Unit = {
writeDataFrameToOpensearch(dataFrame, resultIndex, osClient)
Expand Down

0 comments on commit f742a65

Please sign in to comment.