diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 6c09d625c..30a1a5bc6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -65,4 +65,24 @@ object FlintSparkIndex { */ def flintIndexNamePrefix(fullTableName: String): String = s"flint_${fullTableName.replace(".", "_")}_" + + // TODO: avoid hardcoding env name below by providing another config + private val EMR_S_APP_ID_KEY = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" + private val EMR_S_JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID" + + /** + * Populate environment variables to persist in Flint metadata. + * + * @return + * env key value mapping to populate + */ + def populateEnvToMetadata: Map[String, String] = { + val appId = System.getenv(EMR_S_APP_ID_KEY) + if (appId == null) { + Map.empty + } else { + val jobId = System.getenv(EMR_S_JOB_ID_KEY) + Map(EMR_S_APP_ID_KEY -> appId, EMR_S_JOB_ID_KEY -> jobId) + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e18123a0e..3db325c3e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -11,7 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -52,7 +52,8 @@ case class FlintSparkCoveringIndex( | "kind": "$kind", | "indexedColumns": $getMetaInfo, | "source": "$tableName", - | "options": $getIndexOptions + | "options": $getIndexOptions, + | "properties": $getIndexProperties | }, | "properties": $getSchema | } @@ -76,6 +77,15 @@ case class FlintSparkCoveringIndex( Serialization.write(options.options) } + private def getIndexProperties: String = { + val envMap = populateEnvToMetadata + if (envMap.isEmpty) { + "{}" + } else { + s"""{ "env": ${Serialization.write(envMap)} }""" + } + } + private def getSchema: String = { val catalogDDL = indexedColumns diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 9efbc6c4e..dd9cb6bdf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -11,7 +11,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer @@ -59,7 +59,8 @@ class FlintSparkSkippingIndex( | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, | "source": "$tableName", - | "options": $getIndexOptions + | "options": $getIndexOptions, + | "properties": $getIndexProperties | }, | "properties": $getSchema | } @@ -89,6 +90,15 @@ class FlintSparkSkippingIndex( Serialization.write(options.options) } + private def getIndexProperties: String = { + val envMap = populateEnvToMetadata + if (envMap.isEmpty) { + "{}" + } else { + s"""{ "env": ${Serialization.write(envMap)} }""" + } + } + private def getSchema: String = { val allFieldTypes = indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 717438f7e..ac0b33746 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -57,7 +57,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.ci_test", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "name": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 05f398b88..8f2de17f0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.test", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "year": { @@ -506,7 +507,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "struct" | }], | "source": "$testTable", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "boolean_col": {