Skip to content

Commit

Permalink
support shard level split on read path (opensearch-project#402)
Browse files Browse the repository at this point in the history
Signed-off-by: Peng Huo <[email protected]>
  • Loading branch information
penghuo authored Jul 9, 2024
1 parent b5715f6 commit 087a9df
Show file tree
Hide file tree
Showing 15 changed files with 326 additions and 63 deletions.
3 changes: 3 additions & 0 deletions docs/opensearch-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ df.show()

### table operation
- Table only support read operation, for instance, SELECT, DESCRIBE.

## InputPartition
Each InputPartition represents a data split that can be processed by a single Spark task. The number of input partitions returned corresponds to the number of RDD partitions produced by this scan. An OpenSearch table can contain multiple indices, each comprising multiple shards. The input partition count is determined by multiplying the number of indices by the number of shards. Read requests are divided and executed in parallel across multiple shards on multiple nodes.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ public interface FlintClient {
*/
FlintReader createReader(String indexName, String query);

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param shardId shard id.
* @param query DSL query. DSL query is null means match_all
* @return {@link FlintReader}.
*/
FlintReader createReader(String indexName, String shardId, String query);

/**
* Create {@link FlintWriter}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
Expand Down Expand Up @@ -62,6 +63,9 @@ public class FlintOpenSearchClient implements FlintClient {
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');

private final static Function<String, String> SHARD_ID_PREFERENCE =
shardId -> shardId == null ? shardId : "_shards:"+shardId;

private final FlintOptions options;

public FlintOpenSearchClient(FlintOptions options) {
Expand Down Expand Up @@ -173,7 +177,20 @@ public void deleteIndex(String indexName) {
*/
@Override
public FlintReader createReader(String indexName, String query) {
LOG.info("Creating Flint index reader for " + indexName + " with query " + query);
return createReader(indexName, query, null);
}

/**
* Create {@link FlintReader}.
*
* @param indexName index name.
* @param query DSL query. DSL query is null means match_all
* @param shardId shardId
* @return
*/
@Override
public FlintReader createReader(String indexName, String query, String shardId) {
LOG.info("Creating Flint index reader for " + indexName + " with query " + query + " shardId " + shardId);
try {
QueryBuilder queryBuilder = new MatchAllQueryBuilder();
if (!Strings.isNullOrEmpty(query)) {
Expand All @@ -185,7 +202,8 @@ public FlintReader createReader(String indexName, String query) {
return new OpenSearchScrollReader(createClient(),
sanitizeIndexName(indexName),
new SearchSourceBuilder().query(queryBuilder),
options);
options,
SHARD_ID_PREFERENCE.apply(shardId));
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.io.IOException;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -35,8 +37,23 @@ public class OpenSearchScrollReader extends OpenSearchReader {

private String scrollId = null;

public OpenSearchScrollReader(IRestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder, FlintOptions options) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder.size(options.getScrollSize())));
public OpenSearchScrollReader(
IRestHighLevelClient client,
String indexName,
SearchSourceBuilder searchSourceBuilder,
FlintOptions options) {
this(client, indexName, searchSourceBuilder, options, null);
}

public OpenSearchScrollReader(
IRestHighLevelClient client,
String indexName,
SearchSourceBuilder searchSourceBuilder,
FlintOptions options,
String preference) {
super(client,
applyPreference(preference).apply(new SearchRequest().indices(indexName)
.source(searchSourceBuilder.size(options.getScrollSize()))));
this.options = options;
this.scrollDuration = TimeValue.timeValueMinutes(options.getScrollDuration());
}
Expand All @@ -53,8 +70,8 @@ Optional<SearchResponse> search(SearchRequest request) throws IOException {
} else {
try {
return Optional
.of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId),
RequestOptions.DEFAULT));
.of(client.scroll(new SearchScrollRequest().scroll(scrollDuration).scrollId(scrollId),
RequestOptions.DEFAULT));
} catch (OpenSearchStatusException e) {
LOG.log(Level.WARNING, "scroll context not exist", e);
scrollId = null;
Expand Down Expand Up @@ -88,4 +105,12 @@ void clean() throws IOException {
public String getScrollId() {
return scrollId;
}

static private Function<SearchRequest, SearchRequest> applyPreference(String preference) {
if (Strings.isNullOrEmpty(preference)) {
return searchRequest -> searchRequest;
} else {
return searchRequest -> searchRequest.preference(preference);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ class OpenSearchCatalog extends CatalogPlugin with TableCatalog with Logging {

override def name(): String = catalogName

@throws[NoSuchNamespaceException]
override def listTables(namespace: Array[String]): Array[Identifier] = {
throw new UnsupportedOperationException("OpenSearchCatalog does not support listTables")
}

@throws[NoSuchTableException]
override def loadTable(ident: Identifier): Table = {
logInfo(s"Loading table ${ident.name()}")
if (!ident.namespace().exists(n => OpenSearchCatalog.isDefaultNamespace(n))) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

import scala.collection.JavaConverters._

import org.opensearch.flint.core.{FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.FlintMetadata

import org.apache.spark.sql.flint.datatype.FlintDataType
import org.apache.spark.sql.types.StructType

/**
* Represents an OpenSearch table.
*
* @param tableName
* The name of the table.
* @param metadata
* Metadata of the table.
*/
case class OpenSearchTable(tableName: String, metadata: Map[String, FlintMetadata]) {
/*
* FIXME. we use first index schema in multiple indices. we should merge StructType to widen type
*/
lazy val schema: StructType = {
metadata.values.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}

lazy val partitions: Array[PartitionInfo] = {
metadata.map { case (partitionName, metadata) =>
PartitionInfo.apply(partitionName, metadata.indexSettings.get)
}.toArray
}
}

object OpenSearchTable {

/**
* Creates an OpenSearchTable instance.
*
* @param tableName
* tableName support (1) single index name. (2) wildcard index name. (3) comma sep index name.
* @param options
* The options for Flint.
* @return
* An instance of OpenSearchTable.
*/
def apply(tableName: String, options: FlintOptions): OpenSearchTable = {
OpenSearchTable(
tableName,
FlintClientBuilder
.build(options)
.getAllIndexMetadata(tableName.split(","): _*)
.asScala
.toMap)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

import org.json4s.{Formats, NoTypeHints}
import org.json4s.jackson.JsonMethods
import org.json4s.native.Serialization

/**
* Represents information about a partition in OpenSearch. Partition is backed by OpenSearch
* Index. Each partition contain a list of Shards
*
* @param partitionName
* partition name.
* @param shards
* shards.
*/
case class PartitionInfo(partitionName: String, shards: Array[ShardInfo]) {}

object PartitionInfo {
implicit val formats: Formats = Serialization.formats(NoTypeHints)

/**
* Creates a PartitionInfo instance.
*
* @param partitionName
* The name of the partition.
* @param settings
* The settings of the partition.
* @return
* An instance of PartitionInfo.
*/
def apply(partitionName: String, settings: String): PartitionInfo = {
val shards =
Range.apply(0, numberOfShards(settings)).map(id => ShardInfo(partitionName, id)).toArray
PartitionInfo(partitionName, shards)
}

/**
* Extracts the number of shards from the settings string.
*
* @param settingStr
* The settings string.
* @return
* The number of shards.
*/
def numberOfShards(settingStr: String): Int = {
val setting = JsonMethods.parse(settingStr)
(setting \ "index.number_of_shards").extract[String].toInt
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.apache.spark.opensearch.table

/**
* Represents information about a shard in OpenSearch.
*
* @param indexName
* The name of the index.
* @param id
* The ID of the shard.
*/
case class ShardInfo(indexName: String, id: Int)
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,19 @@ import org.apache.spark.sql.flint.storage.FlintQueryCompiler
import org.apache.spark.sql.types.StructType

case class FlintPartitionReaderFactory(
tableName: String,
schema: StructType,
options: FlintSparkConf,
pushedPredicates: Array[Predicate])
extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
val query = FlintQueryCompiler(schema).compile(pushedPredicates)
val flintClient = FlintClientBuilder.build(options.flintOptions())
new FlintPartitionReader(flintClient.createReader(tableName, query), schema, options)
partition match {
case OpenSearchSplit(shardInfo) =>
val query = FlintQueryCompiler(schema).compile(pushedPredicates)
val flintClient = FlintClientBuilder.build(options.flintOptions())
new FlintPartitionReader(
flintClient.createReader(shardInfo.indexName, query, shardInfo.id.toString),
schema,
options)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.collection.JavaConverters._
import org.opensearch.flint.core.FlintClientBuilder

import org.apache.spark.opensearch.catalog.OpenSearchCatalog
import org.apache.spark.opensearch.table.OpenSearchTable
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, TableCapability}
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, BATCH_WRITE, STREAMING_WRITE, TRUNCATE}
Expand Down Expand Up @@ -41,25 +42,17 @@ class FlintReadOnlyTable(

lazy val name: String = flintSparkConf.tableName()

// todo. currently, we use first index schema in multiple indices. we should merge StructType
// to widen type
lazy val openSearchTable: OpenSearchTable =
OpenSearchTable.apply(name, flintSparkConf.flintOptions())

lazy val schema: StructType = {
userSpecifiedSchema.getOrElse {
FlintClientBuilder
.build(flintSparkConf.flintOptions())
.getAllIndexMetadata(OpenSearchCatalog.indexNames(name): _*)
.values()
.asScala
.headOption
.map(m => FlintDataType.deserialize(m.getContent))
.getOrElse(StructType(Nil))
}
userSpecifiedSchema.getOrElse { openSearchTable.schema }
}

override def capabilities(): util.Set[TableCapability] =
util.EnumSet.of(BATCH_READ)

override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
FlintScanBuilder(name, schema, flintSparkConf)
FlintScanBuilder(openSearchTable, schema, flintSparkConf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

package org.apache.spark.sql.flint

import org.apache.spark.opensearch.table.{OpenSearchTable, ShardInfo}
import org.apache.spark.sql.connector.expressions.filter.Predicate
import org.apache.spark.sql.connector.read.{Batch, InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types.StructType

case class FlintScan(
tableName: String,
table: OpenSearchTable,
schema: StructType,
options: FlintSparkConf,
pushedPredicates: Array[Predicate])
Expand All @@ -21,11 +22,11 @@ case class FlintScan(
override def readSchema(): StructType = schema

override def planInputPartitions(): Array[InputPartition] = {
Array(OpenSearchInputPartition())
table.partitions.flatMap(p => p.shards.map(s => OpenSearchSplit(s))).toArray
}

override def createReaderFactory(): PartitionReaderFactory = {
FlintPartitionReaderFactory(tableName, schema, options, pushedPredicates)
FlintPartitionReaderFactory(schema, options, pushedPredicates)
}

override def toBatch: Batch = this
Expand All @@ -40,5 +41,10 @@ case class FlintScan(
private def seqToString(seq: Seq[Any]): String = seq.mkString("[", ", ", "]")
}

// todo. add partition support.
private[spark] case class OpenSearchInputPartition() extends InputPartition {}
/**
* Each OpenSearchSplit is backed by an OpenSearch shard.
*
* @param shardInfo
* shardInfo
*/
private[spark] case class OpenSearchSplit(shardInfo: ShardInfo) extends InputPartition {}
Loading

0 comments on commit 087a9df

Please sign in to comment.