Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisting runtime env info into Flint metadata #58

Merged
merged 2 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,20 @@ object FlintSparkIndex {
*/
def flintIndexNamePrefix(fullTableName: String): String =
s"flint_${fullTableName.replace(".", "_")}_"

/**
* Populate environment variables to persist in Flint metadata.
*
* @return
* env key value mapping to populate
*/
def populateEnvToMetadata: Map[String, String] = {
// TODO: avoid hardcoding env name below by providing another config
val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID")
envNames
.flatMap(key =>
Option(System.getenv(key))
.map(value => key -> value))
.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -52,7 +52,8 @@ case class FlintSparkCoveringIndex(
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName",
| "options": $getIndexOptions
| "options": $getIndexOptions,
| "properties": $getIndexProperties
| },
| "properties": $getSchema
| }
Expand All @@ -76,6 +77,15 @@ case class FlintSparkCoveringIndex(
Serialization.write(options.options)
}

private def getIndexProperties: String = {
val envMap = populateEnvToMetadata
if (envMap.isEmpty) {
"{}"
} else {
s"""{ "env": ${Serialization.write(envMap)} }"""
}
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, populateEnvToMetadata, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
Expand Down Expand Up @@ -59,7 +59,8 @@ class FlintSparkSkippingIndex(
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName",
| "options": $getIndexOptions
| "options": $getIndexOptions,
| "properties": $getIndexProperties
| },
| "properties": $getSchema
| }
Expand Down Expand Up @@ -89,6 +90,15 @@ class FlintSparkSkippingIndex(
Serialization.write(options.options)
}

private def getIndexProperties: String = {
val envMap = populateEnvToMetadata
if (envMap.isEmpty) {
"{}"
} else {
s"""{ "env": ${Serialization.write(envMap)} }"""
}
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.ci_test",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "int"
| }],
| "source": "spark_catalog.default.test",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "year": {
Expand Down Expand Up @@ -506,7 +507,8 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite {
| "columnType": "struct<subfield1:string,subfield2:int>"
| }],
| "source": "$testTable",
| "options": {}
| "options": {},
| "properties": {}
| },
| "properties": {
| "boolean_col": {
Expand Down