diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala index 2eca3f5d3..0bac6ac73 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/package.scala @@ -14,19 +14,7 @@ import org.apache.spark.sql.connector.catalog._ package object flint { /** - * Convert data frame to logical plan. - * - * @param df - * data frame - * @return - * logical plan - */ - def dataFrameToLogicalPlan(df: DataFrame): LogicalPlan = { - df.logicalPlan - } - - /** - * Convert logical plan to data frame. + * Convert the given logical plan to Spark data frame. * * @param spark * Spark session diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 64f103d56..0ef4e1108 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -10,20 +10,12 @@ import scala.collection.JavaConverters._ import org.json4s.{Formats, NoTypeHints} import org.json4s.native.Serialization import org.opensearch.flint.core.{FlintClient, FlintClientBuilder} -import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode} import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh} import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex -import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE import org.opensearch.flint.spark.mv.FlintSparkMaterializedView -import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex -import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer} -import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} -import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy -import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy -import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SaveMode._ @@ -179,7 +171,10 @@ class FlintSpark(val spark: SparkSession) { * Flint index list */ def describeIndexes(indexNamePattern: String): Seq[FlintSparkIndex] = { - flintClient.getAllIndexMetadata(indexNamePattern).asScala.map(deserialize) + flintClient + .getAllIndexMetadata(indexNamePattern) + .asScala + .map(FlintSparkIndexFactory.create) } /** @@ -193,7 +188,8 @@ class FlintSpark(val spark: SparkSession) { def describeIndex(indexName: String): Option[FlintSparkIndex] = { if (flintClient.exists(indexName)) { val metadata = flintClient.getIndexMetadata(indexName) - Some(deserialize(metadata)) + val index = FlintSparkIndexFactory.create(metadata) + Some(index) } else { Option.empty } @@ -253,54 +249,6 @@ class FlintSpark(val spark: SparkSession) { .map(Trigger.ProcessingTime) .getOrElse(Trigger.ProcessingTime(0L)) } - - // TODO: move this to Flint index factory - private def deserialize(metadata: FlintMetadata): FlintSparkIndex = { - val indexOptions = FlintSparkIndexOptions( - metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) - - // Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index - metadata.kind match { - case SKIPPING_INDEX_TYPE => - val strategies = metadata.indexedColumns.map { colInfo => - val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) - val columnName = getString(colInfo, "columnName") - val columnType = getString(colInfo, "columnType") - - skippingKind match { - case PARTITION => - PartitionSkippingStrategy(columnName = columnName, columnType = columnType) - case VALUE_SET => - ValueSetSkippingStrategy(columnName = columnName, columnType = columnType) - case MIN_MAX => - MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) - case other => - throw new IllegalStateException(s"Unknown skipping strategy: $other") - } - } - new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) - case COVERING_INDEX_TYPE => - new FlintSparkCoveringIndex( - metadata.name, - metadata.source, - metadata.indexedColumns.map { colInfo => - getString(colInfo, "columnName") -> getString(colInfo, "columnType") - }.toMap, - indexOptions) - case MV_INDEX_TYPE => - new FlintSparkMaterializedView( - metadata.name, - metadata.source, - metadata.indexedColumns.map { colInfo => - getString(colInfo, "columnName") -> getString(colInfo, "columnType") - }.toMap, - indexOptions) - } - } - - private def getString(map: java.util.Map[String, AnyRef], key: String): String = { - map.get(key).asInstanceOf[String] - } } object FlintSpark { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 31664e3cb..0586bfc49 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -120,7 +120,7 @@ object FlintSparkIndex { builder.kind(index.kind) builder.options(index.options.optionsWithDefault.mapValues(_.asInstanceOf[AnyRef]).asJava) - // Optional index properties + // Index properties val envs = populateEnvToMetadata if (envs.nonEmpty) { builder.addProperty("env", envs.asJava) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala new file mode 100644 index 000000000..c5c85d2ed --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import scala.collection.JavaConverters.mapAsScalaMapConverter + +import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.COVERING_INDEX_TYPE +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView.MV_INDEX_TYPE +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind +import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET} +import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy +import org.opensearch.flint.spark.skipping.partition.PartitionSkippingStrategy +import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy + +/** + * Flint Spark index factory that encapsulates specific Flint index instance creation. This is for + * internal code use instead of user facing API. + */ +object FlintSparkIndexFactory { + + /** + * Creates Flint index from generic Flint metadata. + * + * @param metadata + * Flint metadata + * @return + * Flint index + */ + def create(metadata: FlintMetadata): FlintSparkIndex = { + val indexOptions = FlintSparkIndexOptions( + metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) + + // Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index + metadata.kind match { + case SKIPPING_INDEX_TYPE => + val strategies = metadata.indexedColumns.map { colInfo => + val skippingKind = SkippingKind.withName(getString(colInfo, "kind")) + val columnName = getString(colInfo, "columnName") + val columnType = getString(colInfo, "columnType") + + skippingKind match { + case PARTITION => + PartitionSkippingStrategy(columnName = columnName, columnType = columnType) + case VALUE_SET => + ValueSetSkippingStrategy(columnName = columnName, columnType = columnType) + case MIN_MAX => + MinMaxSkippingStrategy(columnName = columnName, columnType = columnType) + case other => + throw new IllegalStateException(s"Unknown skipping strategy: $other") + } + } + new FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) + case COVERING_INDEX_TYPE => + new FlintSparkCoveringIndex( + metadata.name, + metadata.source, + metadata.indexedColumns.map { colInfo => + getString(colInfo, "columnName") -> getString(colInfo, "columnType") + }.toMap, + indexOptions) + case MV_INDEX_TYPE => + new FlintSparkMaterializedView( + metadata.name, + metadata.source, + metadata.indexedColumns.map { colInfo => + getString(colInfo, "columnName") -> getString(colInfo, "columnType") + }.toMap, + indexOptions) + } + } + + private def getString(map: java.util.Map[String, AnyRef], key: String): String = { + map.get(key).asInstanceOf[String] + } +} diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 92906f6ab..e68a8b958 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, EventTimeWatermark, LogicalPlan} import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.flint.logicalPlanToDataFrame -import org.apache.spark.unsafe.types.UTF8String /** * Flint materialized view in Spark. @@ -92,10 +91,7 @@ case class FlintSparkMaterializedView( } private def watermark(timeCol: Attribute, delay: String, child: LogicalPlan) = { - EventTimeWatermark( - timeCol, - IntervalUtils.stringToInterval(UTF8String.fromString(delay)), - child) + EventTimeWatermark(timeCol, IntervalUtils.fromIntervalString(delay), child) } /** diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 4fe4dd1f6..98d620b58 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -80,7 +80,7 @@ class FlintSparkSkippingIndex( df.getOrElse(spark.read.table(tableName)) .groupBy(input_file_name().as(FILE_PATH_COLUMN)) .agg(namedAggFuncs.head, namedAggFuncs.tail: _*) - .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) // TODO: no impact to just add it? + .withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN))) } }