Skip to content

Commit

Permalink
Merge branch 'main' into add-bloom-filter-sql-support
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Mar 15, 2024
2 parents e5c12af + 7382c95 commit b1160fd
Show file tree
Hide file tree
Showing 24 changed files with 534 additions and 25 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ Version compatibility:
To use this application, you can run Spark with Flint extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintSparkExtensions"
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintSparkExtensions"
```

## PPL Extension Usage

To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions"
```

### Running With both Extension
Expand Down
4 changes: 2 additions & 2 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPS
To use PPL to Spark translation, you can run Spark with PPL extension:

```
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensions"
spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkExtensions"
```

### Running With both Flint & PPL Extensions
Expand All @@ -51,7 +51,7 @@ classpath.

Next need to configure both extensions :
```
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.FlintPPLSparkExtensions, org.opensearch.flint.FlintSparkExtensions'"
spark-sql --conf "spark.sql.extensions='org.opensearch.flint.spark.FlintPPLSparkExtensions, org.opensearch.flint.spark.FlintSparkExtensions'"
```

Once this is done, spark will allow both extensions to parse the query (SQL / PPL) and allow the correct execution of the query.
Expand Down
28 changes: 28 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,34 @@ DROP MATERIALIZED VIEW alb_logs_metrics
VACUUM MATERIALIZED VIEW alb_logs_metrics
```

#### All Indexes

- **Show Flint Indexes**: Displays all the flint indexes with their info. It outputs the following columns:
- flint_index_name: the full OpenSearch index name
- kind: type of the index (skipping / covering / mv)
- database: database name for the index
- table: table name for skipping and covering index
- index_name: user defined name for covering index and materialized view
- auto_refresh: auto refresh option of the index (true / false)
- status: status of the index

```sql
SHOW FLINT [INDEX|INDEXES] IN catalog[.database]
```

Example:
```
sql> SHOW FLINT INDEXES IN spark_catalog.default;
fetched rows / total rows = 3/3
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
| flint_index_name | kind | database | table | index_name | auto_refresh | status |
|-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------|
| flint_spark_catalog_default_http_count_view | mv | default | NULL | http_count_view | false | active |
| flint_spark_catalog_default_http_logs_skipping_index | skipping | default | http_logs | NULL | true | refreshing |
| flint_spark_catalog_default_http_logs_status_clientip_index | covering | default | http_logs | status_clientip | false | active |
+-------------------------------------------------------------+----------+----------+-----------+-----------------+--------------+------------+
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class FlintOptions implements Serializable {
public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;

public static final String DATA_SOURCE_NAME = "spark.flint.datasource.name";

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down Expand Up @@ -133,4 +135,8 @@ public String getPassword() {
public int getSocketTimeoutMillis() {
return Integer.parseInt(options.getOrDefault(SOCKET_TIMEOUT_MILLIS, String.valueOf(DEFAULT_SOCKET_TIMEOUT_MILLIS)));
}

public String getDataSourceName() {
return options.getOrDefault(DATA_SOURCE_NAME, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import java.util
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.FlintVersion.current
import org.opensearch.flint.core.metadata.FlintJsonHelper._
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

/**
* Flint metadata follows Flint index specification and defines metadata for a Flint index
Expand All @@ -32,8 +33,10 @@ case class FlintMetadata(
properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Flint index schema */
schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef],
/** Optional latest metadata log entry */
/** Optional latest metadata log entry id */
latestId: Option[String] = None,
/** Optional latest metadata log entry */
latestLogEntry: Option[FlintMetadataLogEntry] = None,
/** Optional Flint index settings. TODO: move elsewhere? */
indexSettings: Option[String]) {

Expand Down Expand Up @@ -79,6 +82,26 @@ case class FlintMetadata(

object FlintMetadata {

/**
* Construct Flint metadata with JSON content, index settings, and latest log entry.
*
* @param content
* JSON content
* @param settings
* index settings
* @param latestLogEntry
* latest metadata log entry
* @return
* Flint metadata
*/
def apply(
content: String,
settings: String,
latestLogEntry: FlintMetadataLogEntry): FlintMetadata = {
val metadata = FlintMetadata(content, settings)
metadata.copy(latestLogEntry = Option(latestLogEntry))
}

/**
* Construct Flint metadata with JSON content and index settings.
*
Expand Down Expand Up @@ -153,6 +176,8 @@ object FlintMetadata {
private var indexedColumns: Array[util.Map[String, AnyRef]] = Array()
private var properties: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
private var schema: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
private var latestId: Option[String] = None
private var latestLogEntry: Option[FlintMetadataLogEntry] = None
private var indexSettings: Option[String] = None

def version(version: FlintVersion): this.type = {
Expand Down Expand Up @@ -215,6 +240,12 @@ object FlintMetadata {
this
}

def latestLogEntry(entry: FlintMetadataLogEntry): this.type = {
this.latestId = Option(entry.id)
this.latestLogEntry = Option(entry)
this
}

def indexSettings(indexSettings: String): this.type = {
this.indexSettings = Option(indexSettings)
this
Expand All @@ -231,7 +262,9 @@ object FlintMetadata {
options = options,
properties = properties,
schema = schema,
indexSettings = indexSettings)
indexSettings = indexSettings,
latestId = latestId,
latestLogEntry = latestLogEntry)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
Expand Down Expand Up @@ -93,8 +94,8 @@ public FlintOpenSearchClient(FlintOptions options) {
}

@Override
public <T> OptimisticTransaction<T> startTransaction(String indexName, String dataSourceName,
boolean forceInit) {
public <T> OptimisticTransaction<T> startTransaction(
String indexName, String dataSourceName, boolean forceInit) {
LOG.info("Starting transaction on index " + indexName + " and data source " + dataSourceName);
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Expand Down Expand Up @@ -164,7 +165,8 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
GetIndexResponse response = client.getIndex(request, RequestOptions.DEFAULT);

return Arrays.stream(response.getIndices())
.map(index -> FlintMetadata.apply(
.map(index -> constructFlintMetadata(
index,
response.getMappings().get(index).source().toString(),
response.getSettings().get(index).toString()))
.collect(Collectors.toList());
Expand All @@ -183,7 +185,7 @@ public FlintMetadata getIndexMetadata(String indexName) {

MappingMetadata mapping = response.getMappings().get(osIndexName);
Settings settings = response.getSettings().get(osIndexName);
return FlintMetadata.apply(mapping.source().string(), settings.toString());
return constructFlintMetadata(indexName, mapping.source().string(), settings.toString());
} catch (Exception e) {
throw new IllegalStateException("Failed to get Flint index metadata for " + osIndexName, e);
}
Expand Down Expand Up @@ -287,6 +289,34 @@ public IRestHighLevelClient createClient() {
return new RestHighLevelClientWrapper(new RestHighLevelClient(restClientBuilder));
}

/*
* Constructs Flint metadata with latest metadata log entry attached if it's available.
* It relies on FlintOptions to provide data source name.
*/
private FlintMetadata constructFlintMetadata(String indexName, String mapping, String settings) {
String dataSourceName = options.getDataSourceName();
String metaLogIndexName = dataSourceName.isEmpty() ? META_LOG_NAME_PREFIX
: META_LOG_NAME_PREFIX + "_" + dataSourceName;
Optional<FlintMetadataLogEntry> latest = Optional.empty();

try (IRestHighLevelClient client = createClient()) {
if (client.doesIndexExist(new GetIndexRequest(metaLogIndexName), RequestOptions.DEFAULT)) {
LOG.info("Found metadata log index " + metaLogIndexName);
FlintOpenSearchMetadataLog metadataLog =
new FlintOpenSearchMetadataLog(this, indexName, metaLogIndexName);
latest = metadataLog.getLatest();
}
} catch (IOException e) {
throw new IllegalStateException("Failed to check if index metadata log index exists " + metaLogIndexName, e);
}

if (latest.isEmpty()) {
return FlintMetadata.apply(mapping, settings);
} else {
return FlintMetadata.apply(mapping, settings, latest.get());
}
}

/*
* Because OpenSearch requires all lowercase letters in index name, we have to
* lowercase all letters in the given Flint index name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ object FlintSparkConf {
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val QUERY =
FlintConfig("spark.flint.job.query")
.doc("Flint query for batch and streaming job")
.createOptional()
val JOB_TYPE =
FlintConfig(s"spark.flint.job.type")
.doc("Flint job type. Including interactive and streaming")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.flint.datatype.FlintDataType
Expand All @@ -28,6 +29,11 @@ trait FlintSparkIndex {
*/
val options: FlintSparkIndexOptions

/**
* Latest metadata log entry for index
*/
val latestLogEntry: Option[FlintMetadataLogEntry]

/**
* @return
* Flint index name
Expand Down Expand Up @@ -151,6 +157,12 @@ object FlintSparkIndex {
if (settings.isDefined) {
builder.indexSettings(settings.get)
}

// Optional latest metadata log entry
val latestLogEntry = index.latestLogEntry
if (latestLogEntry.isDefined) {
builder.latestLogEntry(latestLogEntry.get)
}
builder
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ object FlintSparkIndexFactory {
def create(metadata: FlintMetadata): FlintSparkIndex = {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)
val latestLogEntry = metadata.latestLogEntry

// Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index
metadata.kind match {
Expand Down Expand Up @@ -69,7 +70,7 @@ object FlintSparkIndexFactory {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions)
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions, latestLogEntry)
case COVERING_INDEX_TYPE =>
FlintSparkCoveringIndex(
metadata.name,
Expand All @@ -78,15 +79,17 @@ object FlintSparkIndexFactory {
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
getOptString(metadata.properties, "filterCondition"),
indexOptions)
indexOptions,
latestLogEntry)
case MV_INDEX_TYPE =>
FlintSparkMaterializedView(
metadata.name,
metadata.source,
metadata.indexedColumns.map { colInfo =>
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
indexOptions)
indexOptions,
latestLogEntry)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark.covering
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand All @@ -24,13 +25,20 @@ import org.apache.spark.sql._
* source table name
* @param indexedColumns
* indexed column list
* @param filterCondition
* filtering condition
* @param options
* index options
* @param latestLogEntry
* latest metadata log entry for index
*/
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String],
filterCondition: Option[String] = None,
override val options: FlintSparkIndexOptions = empty)
override val options: FlintSparkIndexOptions = empty,
override val latestLogEntry: Option[FlintMetadataLogEntry] = None)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter
import scala.collection.convert.ImplicitConversions.`map AsScala`

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
Expand All @@ -37,12 +38,15 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
* output schema
* @param options
* index options
* @param latestLogEntry
* latest metadata log entry for index
*/
case class FlintSparkMaterializedView(
mvName: String,
query: String,
outputSchema: Map[String, String],
override val options: FlintSparkIndexOptions = empty)
override val options: FlintSparkIndexOptions = empty,
override val latestLogEntry: Option[FlintMetadataLogEntry] = None)
extends FlintSparkIndex
with StreamingRefresh {

Expand Down
Loading

0 comments on commit b1160fd

Please sign in to comment.