From 71a659591fa40866c598778f9afda54c94e1a856 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 3 Oct 2023 13:35:11 -0700 Subject: [PATCH 1/2] Persist env in Flint metadata Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndex.scala | 20 +++++++++++++++++++ .../covering/FlintSparkCoveringIndex.scala | 14 +++++++++++-- .../skipping/FlintSparkSkippingIndex.scala | 14 +++++++++++-- .../FlintSparkCoveringIndexITSuite.scala | 3 ++- .../FlintSparkSkippingIndexITSuite.scala | 6 ++++-- 5 files changed, 50 insertions(+), 7 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 6c09d625c..30a1a5bc6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -65,4 +65,24 @@ object FlintSparkIndex { */ def flintIndexNamePrefix(fullTableName: String): String = s"flint_${fullTableName.replace(".", "_")}_" + + // TODO: avoid hardcoding env name below by providing another config + private val EMR_S_APP_ID_KEY = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" + private val EMR_S_JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID" + + /** + * Populate environment variables to persist in Flint metadata. + * + * @return + * env key value mapping to populate + */ + def populateEnvToMetadata: Map[String, String] = { + val appId = System.getenv(EMR_S_APP_ID_KEY) + if (appId == null) { + Map.empty + } else { + val jobId = System.getenv(EMR_S_JOB_ID_KEY) + Map(EMR_S_APP_ID_KEY -> appId, EMR_S_JOB_ID_KEY -> jobId) + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e18123a0e..3db325c3e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -11,7 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render} import org.json4s.native.Serialization import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE} @@ -52,7 +52,8 @@ case class FlintSparkCoveringIndex( | "kind": "$kind", | "indexedColumns": $getMetaInfo, | "source": "$tableName", - | "options": $getIndexOptions + | "options": $getIndexOptions, + | "properties": $getIndexProperties | }, | "properties": $getSchema | } @@ -76,6 +77,15 @@ case class FlintSparkCoveringIndex( Serialization.write(options.options) } + private def getIndexProperties: String = { + val envMap = populateEnvToMetadata + if (envMap.isEmpty) { + "{}" + } else { + s"""{ "env": ${Serialization.write(envMap)} }""" + } + } + private def getSchema: String = { val catalogDDL = indexedColumns diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 9efbc6c4e..dd9cb6bdf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -11,7 +11,7 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} -import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN} +import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE} import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer @@ -59,7 +59,8 @@ class FlintSparkSkippingIndex( | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, | "source": "$tableName", - | "options": $getIndexOptions + | "options": $getIndexOptions, + | "properties": $getIndexProperties | }, | "properties": $getSchema | } @@ -89,6 +90,15 @@ class FlintSparkSkippingIndex( Serialization.write(options.options) } + private def getIndexProperties: String = { + val envMap = populateEnvToMetadata + if (envMap.isEmpty) { + "{}" + } else { + s"""{ "env": ${Serialization.write(envMap)} }""" + } + } + private def getSchema: String = { val allFieldTypes = indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string") diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index 717438f7e..ac0b33746 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -57,7 +57,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.ci_test", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "name": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 05f398b88..8f2de17f0 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "int" | }], | "source": "spark_catalog.default.test", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "year": { @@ -506,7 +507,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "columnType": "struct" | }], | "source": "$testTable", - | "options": {} + | "options": {}, + | "properties": {} | }, | "properties": { | "boolean_col": { From 444811392d7a319a81f987808d445423e53163d5 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 3 Oct 2023 14:32:50 -0700 Subject: [PATCH 2/2] Refactor get env code Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndex.scala | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 30a1a5bc6..a19e603dc 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -66,10 +66,6 @@ object FlintSparkIndex { def flintIndexNamePrefix(fullTableName: String): String = s"flint_${fullTableName.replace(".", "_")}_" - // TODO: avoid hardcoding env name below by providing another config - private val EMR_S_APP_ID_KEY = "SERVERLESS_EMR_VIRTUAL_CLUSTER_ID" - private val EMR_S_JOB_ID_KEY = "SERVERLESS_EMR_JOB_ID" - /** * Populate environment variables to persist in Flint metadata. * @@ -77,12 +73,12 @@ object FlintSparkIndex { * env key value mapping to populate */ def populateEnvToMetadata: Map[String, String] = { - val appId = System.getenv(EMR_S_APP_ID_KEY) - if (appId == null) { - Map.empty - } else { - val jobId = System.getenv(EMR_S_JOB_ID_KEY) - Map(EMR_S_APP_ID_KEY -> appId, EMR_S_JOB_ID_KEY -> jobId) - } + // TODO: avoid hardcoding env name below by providing another config + val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID") + envNames + .flatMap(key => + Option(System.getenv(key)) + .map(value => key -> value)) + .toMap } }