Skip to content

Commit

Permalink
Persisting runtime env info into Flint metadata (opensearch-project#58)
Browse files Browse the repository at this point in the history
* Persist env in Flint metadata

Signed-off-by: Chen Dai <[email protected]>

* Refactor get env code

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 4, 2023
1 parent e3210f0 commit 7044486
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,20 @@ object FlintSparkIndex {
*/
def flintIndexNamePrefix(fullTableName: String): String =
s"flint_${fullTableName.replace(".", "_")}_"

/**
* Populate environment variables to persist in Flint metadata.
*
* @return
* env key value mapping to populate
*/
def populateEnvToMetadata: Map[String, String] = {
// TODO: avoid hardcoding env name below by providing another config
val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID")
envNames
.flatMap(key =>
Option(System.getenv(key))
.map(value => key -> value))
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -52,7 +52,8 @@ case class FlintSparkCoveringIndex(
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName",
| "options": $getIndexOptions
| "options": $getIndexOptions,
| "properties": $getIndexProperties
| },
| "properties": $getSchema
| }
Expand All @@ -76,6 +77,15 @@ case class FlintSparkCoveringIndex(
Serialization.write(options.options)
}

private def getIndexProperties: String = {
val envMap = populateEnvToMetadata
if (envMap.isEmpty) {
"{}"
} else {
s"""{ "env": ${Serialization.write(envMap)} }"""
}
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
Expand Down Expand Up @@ -59,7 +59,8 @@ class FlintSparkSkippingIndex(
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName",
| "options": $getIndexOptions
| "options": $getIndexOptions,
| "properties": $getIndexProperties
| },
| "properties": $getSchema
| }
Expand Down Expand Up @@ -89,6 +90,15 @@ class FlintSparkSkippingIndex(
Serialization.write(options.options)
}

private def getIndexProperties: String = {
val envMap = populateEnvToMetadata
if (envMap.isEmpty) {
"{}"
} else {
s"""{ "env": ${Serialization.write(envMap)} }"""
}
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.ci_test",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.test",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "year": {
Expand Down Expand Up @@ -506,7 +507,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
| "source": "$testTable",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "boolean_col": {
Expand Down

0 comments on commit 7044486

Please sign in to comment.