diff --git a/docs/index.md b/docs/index.md index 31147aed4..74f3648d6 100644 --- a/docs/index.md +++ b/docs/index.md @@ -278,6 +278,34 @@ DROP MATERIALIZED VIEW alb_logs_metrics VACUUM MATERIALIZED VIEW alb_logs_metrics ``` +#### All Indexes + +- **Show Flint Indexes**: Displays all the flint indexes with their info. It outputs the following columns: + - flint_index_name: the full OpenSearch index name + - kind: type of the index (skipping / covering / mv) + - database: database name for the index + - table: table name for skipping and covering index + - index_name: user defined name for covering index and materialized view + - auto_refresh: auto refresh option of the index (true / false) + - status: status of the index + +```sql +SHOW FLINT [INDEX|INDEXES] IN catalog[.database] +``` + +Example: +``` +sql> SHOW FLINT INDEXES IN spark_catalog.default; +fetched rows / total rows = 3/3 ++-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+ +| flint_index_name | kind | database | table | index_name | auto_refresh | status | +|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------| +| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active | +| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing | +| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active | ++-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+ +``` + #### Create Index Options User can provide the following options in `WITH` clause of create statement: diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java index 410d896d2..7020deba8 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/FlintOptions.java @@ -78,6 +78,8 @@ public class FlintOptions implements Serializable { public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000; public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000; + + public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name"; public FlintOptions(Map options) { this.options = options; @@ -133,4 +135,8 @@ public String getPassword() { public int getSocketTimeoutMillis() { return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS))); } + + public String getDataSourceName() { + return options.getOrDefault(DATA_SOURCE_NAME, ""); + } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala index 5e2baceab..f62731643 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala +++ b/flint-core/src/main/scala/org/opensearch/flint/core/metadata/FlintMetadata.scala @@ -10,6 +10,7 @@ import java.util import org.opensearch.flint.core.FlintVersion import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.core.metadata.FlintJsonHelper._ +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry /** * Flint metadata follows Flint index specification and defines metadata for a Flint index @@ -32,8 +33,10 @@ case class FlintMetadata( properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], /** Flint index schema */ schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef], - /** Optional latest metadata log entry */ + /** Optional latest metadata log entry id */ latestId: Option[String] = None, + /** Optional latest metadata log entry */ + latestLogEntry: Option[FlintMetadataLogEntry] = None, /** Optional Flint index settings. TODO: move elsewhere? */ indexSettings: Option[String]) { @@ -79,6 +82,26 @@ case class FlintMetadata( object FlintMetadata { + /** + * Construct Flint metadata with JSON content, index settings, and latest log entry. + * + * @param content + * JSON content + * @param settings + * index settings + * @param latestLogEntry + * latest metadata log entry + * @return + * Flint metadata + */ + def apply( + content: String, + settings: String, + latestLogEntry: FlintMetadataLogEntry): FlintMetadata = { + val metadata = FlintMetadata(content, settings) + metadata.copy(latestLogEntry = Option(latestLogEntry)) + } + /** * Construct Flint metadata with JSON content and index settings. * @@ -153,6 +176,8 @@ object FlintMetadata { private var indexedColumns: Array[util.Map[String, AnyRef]] = Array() private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]() + private var latestId: Option[String] = None + private var latestLogEntry: Option[FlintMetadataLogEntry] = None private var indexSettings: Option[String] = None def version(version: FlintVersion): this.type = { @@ -215,6 +240,12 @@ object FlintMetadata { this } + def latestLogEntry(entry: FlintMetadataLogEntry): this.type = { + this.latestId = Option(entry.id) + this.latestLogEntry = Option(entry) + this + } + def indexSettings(indexSettings: String): this.type = { this.indexSettings = Option(indexSettings) this @@ -231,7 +262,9 @@ object FlintMetadata { options = options, properties = properties, schema = schema, - indexSettings = indexSettings) + indexSettings = indexSettings, + latestId = latestId, + latestLogEntry = latestLogEntry) } } } diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java index 45aedbaa6..da5877262 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/FlintOpenSearchClient.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; @@ -93,8 +94,8 @@ public FlintOpenSearchClient(FlintOptions options) { } @Override - public OptimisticTransaction startTransaction(String indexName, String dataSourceName, - boolean forceInit) { + public OptimisticTransaction startTransaction( + String indexName, String dataSourceName, boolean forceInit) { LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName); String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX : META_LOG_NAME_PREFIX + "_" + dataSourceName; @@ -164,7 +165,8 @@ public List getAllIndexMetadata(String indexNamePattern) { GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT); return Arrays.stream(response.getIndices()) - .map(index -> FlintMetadata.apply( + .map(index -> constructFlintMetadata( + index, response.getMappings().get(index).source().toString(), response.getSettings().get(index).toString())) .collect(Collectors.toList()); @@ -183,7 +185,7 @@ public FlintMetadata getIndexMetadata(String indexName) { MappingMetadata mapping = response.getMappings().get(osIndexName); Settings settings = response.getSettings().get(osIndexName); - return FlintMetadata.apply(mapping.source().string(), settings.toString()); + return constructFlintMetadata(indexName, mapping.source().string(), settings.toString()); } catch (Exception e) { throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e); } @@ -287,6 +289,34 @@ public IRestHighLevelClient createClient() { return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder)); } + /* + * Constructs Flint metadata with latest metadata log entry attached if it's available. + * It relies on FlintOptions to provide data source name. + */ + private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) { + String dataSourceName = options.getDataSourceName(); + String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX + : META_LOG_NAME_PREFIX + "_" + dataSourceName; + Optional latest = Optional.empty(); + + try (IRestHighLevelClient client = createClient()) { + if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) { + LOG.info("Found metadata log index " + metaLogIndexName); + FlintOpenSearchMetadataLog metadataLog = + new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName); + latest = metadataLog.getLatest(); + } + } catch (IOException e) { + throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e); + } + + if (latest.isEmpty()) { + return FlintMetadata.apply(mapping, settings); + } else { + return FlintMetadata.apply(mapping, settings, latest.get()); + } + } + /* * Because OpenSearch requires all lowercase letters in index name, we have to * lowercase all letters in the given Flint index name. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala index 248d105a2..702b1475e 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndex.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.flint.datatype.FlintDataType @@ -28,6 +29,11 @@ trait FlintSparkIndex { */ val options: FlintSparkIndexOptions + /** + * Latest metadata log entry for index + */ + val latestLogEntry: Option[FlintMetadataLogEntry] + /** * @return * Flint index name @@ -151,6 +157,12 @@ object FlintSparkIndex { if (settings.isDefined) { builder.indexSettings(settings.get) } + + // Optional latest metadata log entry + val latestLogEntry = index.latestLogEntry + if (latestLogEntry.isDefined) { + builder.latestLogEntry(latestLogEntry.get) + } builder } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala index 7a783a610..847b06984 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexFactory.scala @@ -40,6 +40,7 @@ object FlintSparkIndexFactory { def create(metadata: FlintMetadata): FlintSparkIndex = { val indexOptions = FlintSparkIndexOptions( metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap) + val latestLogEntry = metadata.latestLogEntry // Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index metadata.kind match { @@ -69,7 +70,7 @@ object FlintSparkIndexFactory { throw new IllegalStateException(s"Unknown skipping strategy: $other") } } - FlintSparkSkippingIndex(metadata.source, strategies, indexOptions) + FlintSparkSkippingIndex(metadata.source, strategies, indexOptions, latestLogEntry) case COVERING_INDEX_TYPE => FlintSparkCoveringIndex( metadata.name, @@ -78,7 +79,8 @@ object FlintSparkIndexFactory { getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, getOptString(metadata.properties, "filterCondition"), - indexOptions) + indexOptions, + latestLogEntry) case MV_INDEX_TYPE => FlintSparkMaterializedView( metadata.name, @@ -86,7 +88,8 @@ object FlintSparkIndexFactory { metadata.indexedColumns.map { colInfo => getString(colInfo, "columnName") -> getString(colInfo, "columnType") }.toMap, - indexOptions) + indexOptions, + latestLogEntry) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index e23126c68..edbab78b6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.covering import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty @@ -24,13 +25,20 @@ import org.apache.spark.sql._ * source table name * @param indexedColumns * indexed column list + * @param filterCondition + * filtering condition + * @param options + * index options + * @param latestLogEntry + * latest metadata log entry for index */ case class FlintSparkCoveringIndex( indexName: String, tableName: String, indexedColumns: Map[String, String], filterCondition: Option[String] = None, - override val options: FlintSparkIndexOptions = empty) + override val options: FlintSparkIndexOptions = empty, + override val latestLogEntry: Option[FlintMetadataLogEntry] = None) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala index 656cc387d..31d1d91f3 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/mv/FlintSparkMaterializedView.scala @@ -11,6 +11,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import scala.collection.convert.ImplicitConversions.`map AsScala` import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions} import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh} import org.opensearch.flint.spark.FlintSparkIndexOptions.empty @@ -37,12 +38,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * output schema * @param options * index options + * @param latestLogEntry + * latest metadata log entry for index */ case class FlintSparkMaterializedView( mvName: String, query: String, outputSchema: Map[String, String], - override val options: FlintSparkIndexOptions = empty) + override val options: FlintSparkIndexOptions = empty, + override val latestLogEntry: Option[FlintMetadataLogEntry] = None) extends FlintSparkIndex with StreamingRefresh { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index c27a7f7e2..db56386f1 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping import scala.collection.JavaConverters.mapAsJavaMapConverter import org.opensearch.flint.core.metadata.FlintMetadata +import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.spark._ import org.opensearch.flint.spark.FlintSparkIndex._ import org.opensearch.flint.spark.FlintSparkIndexOptions.empty @@ -28,11 +29,16 @@ import org.apache.spark.sql.functions.{col, input_file_name, sha1} * source table name * @param indexedColumns * indexed column list + * @param options + * index options + * @param latestLogEntry + * latest metadata log entry for index */ case class FlintSparkSkippingIndex( tableName: String, indexedColumns: Seq[FlintSparkSkippingStrategy], - override val options: FlintSparkIndexOptions = empty) + override val options: FlintSparkIndexOptions = empty, + override val latestLogEntry: Option[FlintMetadataLogEntry] = None) extends FlintSparkIndex { require(indexedColumns.nonEmpty, "indexed columns must not be empty") diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala index 011bb37fe..f3f7b42b6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/FlintSparkSqlAstBuilder.scala @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext import org.antlr.v4.runtime.tree.{ParseTree, RuleNode} import org.opensearch.flint.spark.FlintSpark import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder +import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder @@ -26,6 +27,7 @@ class FlintSparkSqlAstBuilder with FlintSparkSkippingIndexAstBuilder with FlintSparkCoveringIndexAstBuilder with FlintSparkMaterializedViewAstBuilder + with FlintSparkIndexAstBuilder with FlintSparkIndexJobAstBuilder with SparkSqlAstBuilder { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala new file mode 100644 index 000000000..925490547 --- /dev/null +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/sql/index/FlintSparkIndexAstBuilder.scala @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark.sql.index + +import scala.collection.convert.ImplicitConversions.`collection AsScalaIterable` + +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex +import org.opensearch.flint.spark.sql.{FlintSparkSqlCommand, FlintSparkSqlExtensionsVisitor, SparkSqlAstBuilder} +import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.ShowFlintIndexStatementContext + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.Command +import org.apache.spark.sql.types.{BooleanType, StringType} + +/** + * Flint Spark AST builder that builds Spark command for Flint index management statement. + */ +trait FlintSparkIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[AnyRef] { + self: SparkSqlAstBuilder => + + override def visitShowFlintIndexStatement(ctx: ShowFlintIndexStatementContext): Command = { + val outputSchema = Seq( + AttributeReference("flint_index_name", StringType, nullable = false)(), + AttributeReference("kind", StringType, nullable = false)(), + AttributeReference("database", StringType, nullable = false)(), + AttributeReference("table", StringType, nullable = true)(), + AttributeReference("index_name", StringType, nullable = true)(), + AttributeReference("auto_refresh", BooleanType, nullable = false)(), + AttributeReference("status", StringType, nullable = false)()) + + FlintSparkSqlCommand(outputSchema) { flint => + val catalogDbName = + ctx.catalogDb.parts + .map(part => part.getText) + .mkString("_") + val indexNamePattern = s"flint_${catalogDbName}_*" + flint + .describeIndexes(indexNamePattern) + .map { index => + val (databaseName, tableName, indexName) = index match { + case skipping: FlintSparkSkippingIndex => + val parts = skipping.tableName.split('.') + (parts(1), parts.drop(2).mkString("."), null) + case covering: FlintSparkCoveringIndex => + val parts = covering.tableName.split('.') + (parts(1), parts.drop(2).mkString("."), covering.indexName) + case mv: FlintSparkMaterializedView => + val parts = mv.mvName.split('.') + (parts(1), null, parts.drop(2).mkString(".")) + } + + val status = index.latestLogEntry match { + case Some(entry) => entry.state.toString + case None => "unavailable" + } + + Row( + index.name, + index.kind, + databaseName, + tableName, + indexName, + index.options.autoRefresh(), + status) + } + } + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala index ba9acffd1..1e2219600 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/OpenSearchTransactionSuite.scala @@ -22,6 +22,8 @@ import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.I import org.opensearch.flint.core.storage.FlintOpenSearchClient._ import org.opensearch.flint.spark.FlintSparkSuite +import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME + /** * Transaction test base suite that creates the metadata log index which enables transaction * support in index operation. @@ -33,7 +35,7 @@ trait OpenSearchTransactionSuite extends FlintSparkSuite { override def beforeAll(): Unit = { super.beforeAll() - spark.conf.set("spark.flint.datasource.name", testDataSourceName) + spark.conf.set(DATA_SOURCE_NAME.key, testDataSourceName) } override def beforeEach(): Unit = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala index fa072898b..7dc5c695c 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/core/FlintTransactionITSuite.scala @@ -11,12 +11,17 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter import org.json4s.{Formats, NoTypeHints} import org.json4s.native.{JsonMethods, Serialization} +import org.mockito.Mockito.when import org.opensearch.flint.OpenSearchTransactionSuite +import org.opensearch.flint.core.metadata.FlintMetadata import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._ import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.index.seqno.SequenceNumbers.{UNASSIGNED_PRIMARY_TERM, UNASSIGNED_SEQ_NO} import org.scalatest.matchers.should.Matchers +import org.scalatestplus.mockito.MockitoSugar.mock + +import org.apache.spark.sql.flint.config.FlintSparkConf.DATA_SOURCE_NAME class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { @@ -26,7 +31,8 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { override def beforeAll(): Unit = { super.beforeAll() - flintClient = new FlintOpenSearchClient(new FlintOptions(openSearchOptions.asJava)) + val options = openSearchOptions + (DATA_SOURCE_NAME.key -> testDataSourceName) + flintClient = new FlintOpenSearchClient(new FlintOptions(options.asJava)) } test("empty metadata log entry content") { @@ -44,6 +50,45 @@ class FlintTransactionITSuite extends OpenSearchTransactionSuite with Matchers { .commit(_ => {}) } + test("get index metadata with latest log entry") { + val testCreateTime = 1234567890123L + val flintMetadataLogEntry = FlintMetadataLogEntry( + id = testLatestId, + seqNo = UNASSIGNED_SEQ_NO, + primaryTerm = UNASSIGNED_PRIMARY_TERM, + createTime = testCreateTime, + state = ACTIVE, + dataSource = testDataSourceName, + error = "") + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) + when(metadata.latestLogEntry).thenReturn(Some(flintMetadataLogEntry)) + + flintClient.createIndex(testFlintIndex, metadata) + createLatestLogEntry(flintMetadataLogEntry) + + val latest = flintClient.getIndexMetadata(testFlintIndex).latestLogEntry + latest.isDefined shouldBe true + latest.get.id shouldBe testLatestId + latest.get.createTime shouldBe testCreateTime + latest.get.dataSource shouldBe testDataSourceName + latest.get.error shouldBe "" + + deleteTestIndex(testFlintIndex) + } + + test("should get empty metadata log entry") { + val metadata = mock[FlintMetadata] + when(metadata.getContent).thenReturn("{}") + when(metadata.indexSettings).thenReturn(None) + flintClient.createIndex(testFlintIndex, metadata) + + flintClient.getIndexMetadata(testFlintIndex).latestLogEntry shouldBe empty + + deleteTestIndex(testFlintIndex) + } + test("should preserve original values when transition") { val testCreateTime = 1234567890123L createLatestLogEntry( diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala index a77d261cd..38355c2f6 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexITSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import java.util.Base64 + import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.core.FlintVersion.current import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName @@ -19,6 +21,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.ci_test" private val testIndex = "name_and_age" private val testFlintIndex = getFlintIndexName(testIndex, testTable) + private val testLatestId = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) override def beforeAll(): Unit = { super.beforeAll() @@ -63,6 +66,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite { | "auto_refresh": "false", | "incremental_refresh": "false" | }, + | "latestId": "$testLatestId", | "properties": { | "filterCondition": "age > 30" | } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala new file mode 100644 index 000000000..ed4e18398 --- /dev/null +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexSqlITSuite.scala @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.spark + +import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName.AUTO_REFRESH +import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex +import org.opensearch.flint.spark.mv.FlintSparkMaterializedView +import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex + +import org.apache.spark.sql.Row + +class FlintSparkIndexSqlITSuite extends FlintSparkSuite { + + private val testTableName = "index_test" + private val testTableQualifiedName = s"spark_catalog.default.$testTableName" + private val testCoveringIndex = "name_age" + private val testMvIndexShortName = "mv1" + private val testMvQuery = s"SELECT name, age FROM $testTableQualifiedName" + + private val testSkippingFlintIndex = + FlintSparkSkippingIndex.getSkippingIndexName(testTableQualifiedName) + private val testCoveringFlintIndex = + FlintSparkCoveringIndex.getFlintIndexName(testCoveringIndex, testTableQualifiedName) + private val testMvIndex = s"spark_catalog.default.$testMvIndexShortName" + private val testMvFlintIndex = FlintSparkMaterializedView.getFlintIndexName(testMvIndex) + + override def beforeAll(): Unit = { + super.beforeAll() + createTimeSeriesTable(testTableQualifiedName) + } + + test("show all flint indexes in catalog and database") { + // Show in catalog + flint + .materializedView() + .name(testMvIndex) + .query(testMvQuery) + .create() + + flint + .coveringIndex() + .name(testCoveringIndex) + .onTable(testTableQualifiedName) + .addIndexColumns("name", "age") + .create() + + flint + .skippingIndex() + .onTable(testTableQualifiedName) + .addValueSet("name") + .create() + + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog"), + Seq( + Row(testMvFlintIndex, "mv", "default", null, testMvIndexShortName, false, "active"), + Row( + testCoveringFlintIndex, + "covering", + "default", + testTableName, + testCoveringIndex, + false, + "active"), + Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active"))) + + // Create index in other database + flint + .materializedView() + .name("spark_catalog.other.mv2") + .query(testMvQuery) + .create() + + // Show in catalog.database shouldn't show index in other database + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog.default"), + Seq( + Row(testMvFlintIndex, "mv", "default", null, testMvIndexShortName, false, "active"), + Row( + testCoveringFlintIndex, + "covering", + "default", + testTableName, + testCoveringIndex, + false, + "active"), + Row(testSkippingFlintIndex, "skipping", "default", testTableName, null, false, "active"))) + + deleteTestIndex( + testMvFlintIndex, + testCoveringFlintIndex, + testSkippingFlintIndex, + FlintSparkMaterializedView.getFlintIndexName("spark_catalog.other.mv2")) + } + + test("should return empty when show flint index in empty database") { + checkAnswer(sql(s"SHOW FLINT INDEX IN spark_catalog.default"), Seq.empty) + } + + test("show flint index with auto refresh") { + flint + .coveringIndex() + .name(testCoveringIndex) + .onTable(testTableQualifiedName) + .addIndexColumns("name", "age") + .options(FlintSparkIndexOptions(Map(AUTO_REFRESH.toString -> "true"))) + .create() + flint.refreshIndex(testCoveringFlintIndex) + + checkAnswer( + sql(s"SHOW FLINT INDEX IN spark_catalog"), + Seq( + Row( + testCoveringFlintIndex, + "covering", + "default", + testTableName, + testCoveringIndex, + true, + "refreshing"))) + deleteTestIndex(testCoveringFlintIndex) + } +} diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala index 586b4e877..7ea8d381e 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkMaterializedViewITSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import java.sql.Timestamp +import java.util.Base64 import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.opensearch.flint.core.FlintVersion.current @@ -21,6 +22,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { private val testTable = "spark_catalog.default.mv_test" private val testMvName = "spark_catalog.default.mv_test_metrics" private val testFlintIndex = getFlintIndexName(testMvName) + private val testLatestId = Base64.getEncoder.encodeToString(testFlintIndex.getBytes) private val testQuery = s""" | SELECT @@ -77,6 +79,7 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite { | "checkpoint_location": "s3://test/", | "watermark_delay": "30 Seconds" | }, + | "latestId": "$testLatestId", | "properties": {} | }, | "properties": { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala index 2b0907a5f..a4e7cfa79 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexITSuite.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.spark +import java.util.Base64 + import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ import org.opensearch.client.RequestOptions @@ -32,6 +34,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { /** Test table and index name */ private val testTable = "spark_catalog.default.test" private val testIndex = getSkippingIndexName(testTable) + private val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes) override def beforeEach(): Unit = { super.beforeEach() @@ -103,6 +106,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "auto_refresh": "false", | "incremental_refresh": "false" | }, + | "latestId": "$testLatestId", | "properties": {} | }, | "properties": { @@ -496,6 +500,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { // Prepare test table val testTable = "spark_catalog.default.data_type_table" val testIndex = getSkippingIndexName(testTable) + val testLatestId = Base64.getEncoder.encodeToString(testIndex.getBytes) sql(s""" | CREATE TABLE $testTable | ( @@ -644,6 +649,7 @@ class FlintSparkSkippingIndexITSuite extends FlintSparkSuite { | "auto_refresh": "false", | "incremental_refresh": "false" | }, + | "latestId": "$testLatestId", | "properties": {} | }, | "properties": {