Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement show flint index statement #276

Merged
merged 13 commits into from
Mar 15, 2024
28 changes: 28 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,34 @@ DROP MATERIALIZED VIEW alb_logs_metrics
VACUUM MATERIALIZED VIEW alb_logs_metrics
```

#### All Indexes

- **Show Flint Indexes**: Displays all the flint indexes with their info. It outputs the following columns:
- flint_index_name: the full OpenSearch index name
- kind: type of the index (skipping / covering / mv)
- database: database name for the index
- table: table name for skipping and covering index
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
```

Example:
```
sql> SHOW FLINT INDEXES IN spark_catalog.default;
fetched rows / total rows = 3/3
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
| flint_index_name | kind | database | table | index_name | auto_refresh | status |
|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------|
| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active |
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing |
| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class FlintOptions implements Serializable {
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down Expand Up @@ -133,4 +135,8 @@ public String getPassword() {
public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}

public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand All @@ -32,8 +33,10 @@ case class FlintMetadata(
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry */
/** Optional latest metadata log entry id */
latestId: Option[String] = None,
/** Optional latest metadata log entry */
latestLogEntry: Option[FlintMetadataLogEntry] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

Expand Down Expand Up @@ -79,6 +82,26 @@ case class FlintMetadata(

object FlintMetadata {

/**
* Construct Flint metadata with JSON content, index settings, and latest log entry.
*
* @param content
* JSON content
* @param settings
* index settings
* @param latestLogEntry
* latest metadata log entry
* @return
* Flint metadata
*/
def apply(
content: String,
settings: String,
latestLogEntry: FlintMetadataLogEntry): FlintMetadata = {
val metadata = FlintMetadata(content, settings)
metadata.copy(latestLogEntry = Option(latestLogEntry))
}

/**
* Construct Flint metadata with JSON content and index settings.
*
Expand Down Expand Up @@ -153,6 +176,8 @@ object FlintMetadata {
private var indexedColumns: Array[util.Map[String, AnyRef]] = Array()
private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
private var latestId: Option[String] = None
private var latestLogEntry: Option[FlintMetadataLogEntry] = None
private var indexSettings: Option[String] = None

def version(version: FlintVersion): this.type = {
Expand Down Expand Up @@ -215,6 +240,12 @@ object FlintMetadata {
this
}

def latestLogEntry(entry: FlintMetadataLogEntry): this.type = {
this.latestId = Option(entry.id)
this.latestLogEntry = Option(entry)
this
}

def indexSettings(indexSettings: String): this.type = {
this.indexSettings = Option(indexSettings)
this
Expand All @@ -231,7 +262,9 @@ object FlintMetadata {
options = options,
properties = properties,
schema = schema,
indexSettings = indexSettings)
indexSettings = indexSettings,
latestId = latestId,
latestLogEntry = latestLogEntry)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
Expand Down Expand Up @@ -93,8 +94,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit) {
public <T> OptimisticTransaction<T> startTransaction(
String indexName, String dataSourceName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Expand Down Expand Up @@ -164,7 +165,8 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.map(index -> FlintMetadata.apply(
.map(index -> constructFlintMetadata(
index,
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
Expand All @@ -183,7 +185,7 @@ public FlintMetadata getIndexMetadata(String indexName) {

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return FlintMetadata.apply(mapping.source().string(), settings.toString());
return constructFlintMetadata(indexName, mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
Expand Down Expand Up @@ -287,6 +289,34 @@ public IRestHighLevelClient createClient() {
return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

/*
* Constructs Flint metadata with latest metadata log entry attached if it's available.
* It relies on FlintOptions to provide data source name.
*/
private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) {
String dataSourceName = options.getDataSourceName();
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Optional<FlintMetadataLogEntry> latest = Optional.empty();

try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName);
latest = metadataLog.getLatest();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}

if (latest.isEmpty()) {
return FlintMetadata.apply(mapping, settings);
} else {
return FlintMetadata.apply(mapping, settings, latest.get());
}
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
Expand All @@ -28,6 +29,11 @@ trait FlintSparkIndex {
*/
val options: FlintSparkIndexOptions

/**
* Latest metadata log entry for index
*/
val latestLogEntry: Option[FlintMetadataLogEntry]

/**
* @return
* Flint index name
Expand Down Expand Up @@ -151,6 +157,12 @@ object FlintSparkIndex {
if (settings.isDefined) {
builder.indexSettings(settings.get)
}

// Optional latest metadata log entry
val latestLogEntry = index.latestLogEntry
if (latestLogEntry.isDefined) {
builder.latestLogEntry(latestLogEntry.get)
}
builder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object FlintSparkIndexFactory {
def create(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry

// Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index
metadata.kind match {
Expand Down Expand Up @@ -69,7 +70,7 @@ object FlintSparkIndexFactory {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions)
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions, latestLogEntry)
case COVERING_INDEX_TYPE =>
FlintSparkCoveringIndex(
metadata.name,
Expand All @@ -78,15 +79,17 @@ object FlintSparkIndexFactory {
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
getOptString(metadata.properties, "filterCondition"),
indexOptions)
indexOptions,
latestLogEntry)
case MV_INDEX_TYPE =>
FlintSparkMaterializedView(
metadata.name,
metadata.source,
metadata.indexedColumns.map { colInfo =>
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
indexOptions)
indexOptions,
latestLogEntry)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.covering
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand All @@ -24,13 +25,20 @@ import org.apache.spark.sql._
* source table name
* @param indexedColumns
* indexed column list
* @param filterCondition
* filtering condition
* @param options
* index options
* @param latestLogEntry
* latest metadata log entry for index
*/
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String],
filterCondition: Option[String] = None,
override val options: FlintSparkIndexOptions = empty)
override val options: FlintSparkIndexOptions = empty,
override val latestLogEntry: Option[FlintMetadataLogEntry] = None)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand All @@ -37,12 +38,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* output schema
* @param options
* index options
* @param latestLogEntry
* latest metadata log entry for index
*/
case class FlintSparkMaterializedView(
mvName: String,
query: String,
outputSchema: Map[String, String],
override val options: FlintSparkIndexOptions = empty)
override val options: FlintSparkIndexOptions = empty,
override val latestLogEntry: Option[FlintMetadataLogEntry] = None)
extends FlintSparkIndex
with StreamingRefresh {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.skipping
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex._
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand All @@ -28,11 +29,16 @@ import org.apache.spark.sql.functions.{col, input_file_name, sha1}
* source table name
* @param indexedColumns
* indexed column list
* @param options
* index options
* @param latestLogEntry
* latest metadata log entry for index
*/
case class FlintSparkSkippingIndex(
tableName: String,
indexedColumns: Seq[FlintSparkSkippingStrategy],
override val options: FlintSparkIndexOptions = empty)
override val options: FlintSparkIndexOptions = empty,
override val latestLogEntry: Option[FlintMetadataLogEntry] = None)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.antlr.v4.runtime.ParserRuleContext
import org.antlr.v4.runtime.tree.{ParseTree, RuleNode}
import org.opensearch.flint.spark.FlintSpark
import org.opensearch.flint.spark.sql.covering.FlintSparkCoveringIndexAstBuilder
import org.opensearch.flint.spark.sql.index.FlintSparkIndexAstBuilder
import org.opensearch.flint.spark.sql.job.FlintSparkIndexJobAstBuilder
import org.opensearch.flint.spark.sql.mv.FlintSparkMaterializedViewAstBuilder
import org.opensearch.flint.spark.sql.skipping.FlintSparkSkippingIndexAstBuilder
Expand All @@ -26,6 +27,7 @@ class FlintSparkSqlAstBuilder
with FlintSparkSkippingIndexAstBuilder
with FlintSparkCoveringIndexAstBuilder
with FlintSparkMaterializedViewAstBuilder
with FlintSparkIndexAstBuilder
with FlintSparkIndexJobAstBuilder
with SparkSqlAstBuilder {

Expand Down
Loading
Loading