Skip to content

Commit

Permalink
Merge branch 'main' into change-logical-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
dai-chen committed Jan 19, 2024
2 parents 46b5b90 + dae36ec commit b5ddc8d
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 67 deletions.
1 change: 0 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,3 @@ jobs:
with:
github_token: ${{ steps.github_app_token.outputs.token }}
head_template: backport/backport-<%= number %>-to-<%= base %>
files_to_skip: 'CHANGELOG.md'
1 change: 1 addition & 0 deletions .github/workflows/snapshot-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
push:
branches:
- main
- 0.*

jobs:
build-and-publish-snapshots:
Expand Down
13 changes: 9 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ The default maximum size for the value set is 100. In cases where a file contain
```sql
CREATE SKIPPING INDEX [IF NOT EXISTS]
ON <object>
( column <index_type> [, ...] )
( column <skip_type> <skip_params> [, ...] )
WHERE <filter_predicate>
WITH ( options )

Expand All @@ -151,18 +151,23 @@ VACUUM SKIPPING INDEX ON <object>
<object> ::= [db_name].[schema_name].table_name
```

Skipping index type:
Skipping index type consists of skip type name and optional parameters

```sql
<index_type> ::= { PARTITION, VALUE_SET, MIN_MAX }
<skip_type> ::= { PARTITION, VALUE_SET, MIN_MAX }

<skip_params> := ( param1, param2, ... )
```

Example:

```sql
CREATE SKIPPING INDEX ON alb_logs
(
elb_status_code VALUE_SET
time PARTITION,
elb_status_code VALUE_SET,
client_ip VALUE_SET(20),
request_processing_time MIN_MAX
)
WHERE time > '2023-04-01 00:00:00'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,6 +72,13 @@ public class FlintOpenSearchClient implements FlintClient {
new NamedXContentRegistry(new SearchModule(Settings.builder().build(),
new ArrayList<>()).getNamedXContents());

/**
* Invalid index name characters to percent-encode,
* excluding '*' because it's reserved for pattern matching.
*/
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

/**
* Metadata log index name prefix
*/
Expand Down Expand Up @@ -121,7 +129,7 @@ public void createIndex(String indexName, FlintMetadata metadata) {

protected void createIndex(String indexName, String mapping, Option<String> settings) {
LOG.info("Creating Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(osIndexName);
request.mapping(mapping, XContentType.JSON);
Expand All @@ -137,7 +145,7 @@ protected void createIndex(String indexName, String mapping, Option<String> sett
@Override
public boolean exists(String indexName) {
LOG.info("Checking if Flint index exists " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
return client.indices().exists(new GetIndexRequest(osIndexName), RequestOptions.DEFAULT);
} catch (IOException e) {
Expand All @@ -148,7 +156,7 @@ public boolean exists(String indexName) {
@Override
public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
LOG.info("Fetching all Flint index metadata for pattern " + indexNamePattern);
String osIndexNamePattern = toLowercase(indexNamePattern);
String osIndexNamePattern = sanitizeIndexName(indexNamePattern);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexNamePattern);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -166,7 +174,7 @@ public List<FlintMetadata> getAllIndexMetadata(String indexNamePattern) {
@Override
public FlintMetadata getIndexMetadata(String indexName) {
LOG.info("Fetching Flint index metadata for " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
GetIndexRequest request = new GetIndexRequest(osIndexName);
GetIndexResponse response = client.indices().get(request, RequestOptions.DEFAULT);
Expand All @@ -182,7 +190,7 @@ public FlintMetadata getIndexMetadata(String indexName) {
@Override
public void deleteIndex(String indexName) {
LOG.info("Deleting Flint index " + indexName);
String osIndexName = toLowercase(indexName);
String osIndexName = sanitizeIndexName(indexName);
try (RestHighLevelClient client = createClient()) {
DeleteIndexRequest request = new DeleteIndexRequest(osIndexName);

Expand Down Expand Up @@ -211,7 +219,7 @@ public FlintReader createReader(String indexName, String query) {
queryBuilder = AbstractQueryBuilder.parseInnerQueryBuilder(parser);
}
return new OpenSearchScrollReader(createClient(),
toLowercase(indexName),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
} catch (IOException e) {
Expand All @@ -221,7 +229,7 @@ public FlintReader createReader(String indexName, String query) {

public FlintWriter createWriter(String indexName) {
LOG.info("Creating Flint index writer for " + indexName);
return new OpenSearchWriter(createClient(), toLowercase(indexName), options.getRefreshPolicy());
return new OpenSearchWriter(createClient(), sanitizeIndexName(indexName), options.getRefreshPolicy());
}

@Override
Expand Down Expand Up @@ -287,4 +295,31 @@ private String toLowercase(String indexName) {

return indexName.toLowerCase(Locale.ROOT);
}

/*
* Percent-encode invalid OpenSearch index name characters.
*/
private String percentEncode(String indexName) {
Objects.requireNonNull(indexName);

StringBuilder builder = new StringBuilder(indexName.length());
for (char ch : indexName.toCharArray()) {
if (INVALID_INDEX_NAME_CHARS.contains(ch)) {
builder.append(String.format("%%%02X", (int) ch));
} else {
builder.append(ch);
}
}
return builder.toString();
}

/*
* Sanitize index name to comply with OpenSearch index name restrictions.
*/
private String sanitizeIndexName(String indexName) {
Objects.requireNonNull(indexName);

String encoded = percentEncode(indexName);
return toLowercase(encoded);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,11 @@ indexColTypeList

indexColType
: identifier skipType=(PARTITION | VALUE_SET | MIN_MAX)
(LEFT_PAREN skipParams RIGHT_PAREN)?
;

skipParams
: propertyValue (COMMA propertyValue)*
;

indexName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
Expand Down Expand Up @@ -371,7 +371,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
.table(tableName)
.table(quotedTableName(tableName))
.writeStream
.queryName(indexName)
.addSinkOptions(options)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ object FlintSparkIndex {
s"flint_${parts(0)}_${parts(1)}_${parts.drop(2).mkString(".")}"
}

/**
* Add backticks to table name to escape special character
*
* @param fullTableName
* source full table name
* @return
* quoted table name
*/
def quotedTableName(fullTableName: String): String = {
require(fullTableName.split('.').length >= 3, s"Table name $fullTableName is not qualified")

val parts = fullTableName.split('.')
s"${parts(0)}.${parts(1)}.`${parts.drop(2).mkString(".")}`"
}

/**
* Populate environment variables to persist in Flint metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import java.util.Collections

import scala.collection.JavaConverters.mapAsScalaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
Expand Down Expand Up @@ -45,12 +47,16 @@ object FlintSparkIndexFactory {
val skippingKind = SkippingKind.withName(getString(colInfo, "kind"))
val columnName = getString(colInfo, "columnName")
val columnType = getString(colInfo, "columnType")
val parameters = getSkipParams(colInfo)

skippingKind match {
case PARTITION =>
PartitionSkippingStrategy(columnName = columnName, columnType = columnType)
case VALUE_SET =>
ValueSetSkippingStrategy(columnName = columnName, columnType = columnType)
ValueSetSkippingStrategy(
columnName = columnName,
columnType = columnType,
params = parameters)
case MIN_MAX =>
MinMaxSkippingStrategy(columnName = columnName, columnType = columnType)
case other =>
Expand Down Expand Up @@ -78,6 +84,14 @@ object FlintSparkIndexFactory {
}
}

private def getSkipParams(colInfo: java.util.Map[String, AnyRef]): Map[String, String] = {
colInfo
.getOrDefault("parameters", Collections.emptyMap())
.asInstanceOf[java.util.Map[String, String]]
.asScala
.toMap
}

private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark._
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, generateSchemaJSON, metadataBuilder, quotedTableName}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

Expand Down Expand Up @@ -60,7 +60,7 @@ case class FlintSparkCoveringIndex(

override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
val colNames = indexedColumns.keys.toSeq
val job = df.getOrElse(spark.read.table(tableName))
val job = df.getOrElse(spark.read.table(quotedTableName(tableName)))

// Add optional filtering condition
filterCondition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ case class FlintSparkSkippingIndex(
.map(col =>
Map[String, AnyRef](
"kind" -> col.kind.toString,
"parameters" -> col.parameters.asJava,
"columnName" -> col.columnName,
"columnType" -> col.columnType).asJava)
.toArray
Expand Down Expand Up @@ -77,7 +78,7 @@ case class FlintSparkSkippingIndex(
new Column(aggFunc.as(name))
}

df.getOrElse(spark.read.table(tableName))
df.getOrElse(spark.read.table(quotedTableName(tableName)))
.groupBy(input_file_name().as(FILE_PATH_COLUMN))
.agg(namedAggFuncs.head, namedAggFuncs.tail: _*)
.withColumn(ID_COLUMN, sha1(col(FILE_PATH_COLUMN)))
Expand Down Expand Up @@ -155,14 +156,20 @@ object FlintSparkSkippingIndex {
*
* @param colName
* indexed column name
* @param params
* value set parameters
* @return
* index builder
*/
def addValueSet(colName: String): Builder = {
def addValueSet(colName: String, params: Map[String, String] = Map.empty): Builder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
addIndexedColumn(
ValueSetSkippingStrategy(
columnName = col.name,
columnType = col.dataType,
params = params))
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ trait FlintSparkSkippingStrategy {
*/
val columnType: String

/**
* Skipping algorithm named parameters.
*/
val parameters: Map[String, String] = Map.empty

/**
* @return
* output schema mapping from Flint field name to Flint field type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.skipping.valueset

import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{SkippingKind, VALUE_SET}
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.DEFAULT_VALUE_SET_SIZE_LIMIT
import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy.{DEFAULT_VALUE_SET_MAX_SIZE, VALUE_SET_MAX_SIZE_KEY}

import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Expression, Literal}
import org.apache.spark.sql.functions._
Expand All @@ -18,14 +18,25 @@ import org.apache.spark.sql.functions._
case class ValueSetSkippingStrategy(
override val kind: SkippingKind = VALUE_SET,
override val columnName: String,
override val columnType: String)
override val columnType: String,
params: Map[String, String] = Map.empty)
extends FlintSparkSkippingStrategy {

override val parameters: Map[String, String] = {
val map = Map.newBuilder[String, String]
map ++= params

if (!params.contains(VALUE_SET_MAX_SIZE_KEY)) {
map += (VALUE_SET_MAX_SIZE_KEY -> DEFAULT_VALUE_SET_MAX_SIZE.toString)
}
map.result()
}

override def outputSchema(): Map[String, String] =
Map(columnName -> columnType)

override def getAggregators: Seq[Expression] = {
val limit = DEFAULT_VALUE_SET_SIZE_LIMIT
val limit = parameters(VALUE_SET_MAX_SIZE_KEY).toInt
val collectSet = collect_set(columnName)
val aggregator =
when(size(collectSet) > limit, lit(null))
Expand All @@ -48,8 +59,7 @@ case class ValueSetSkippingStrategy(

object ValueSetSkippingStrategy {

/**
* Default limit for value set size collected. TODO: make this val once it's configurable
*/
var DEFAULT_VALUE_SET_SIZE_LIMIT = 100
/** Value set max size param key and default value */
var VALUE_SET_MAX_SIZE_KEY = "max_size"
var DEFAULT_VALUE_SET_MAX_SIZE = 100
}
Loading

0 comments on commit b5ddc8d

Please sign in to comment.