Skip to content

Commit

Permalink
Merge branch 'main' into add-filtering-condition-empty-syntax
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 25, 2023
2 parents 4695f4b + 022b974 commit 728d83b
Show file tree
Hide file tree
Showing 30 changed files with 1,773 additions and 274 deletions.
12 changes: 11 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,17 @@ lazy val sparkSqlApplication = (project in file("spark-sql-application"))
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.2.15" % "test"),
libraryDependencies ++= deps(sparkVersion),
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2",
libraryDependencies ++= Seq(
"com.typesafe.play" %% "play-json" % "2.9.2",
// handle AmazonS3Exception
"com.amazonaws" % "aws-java-sdk-s3" % "1.12.568" % "provided"
// the transitive jackson.core dependency conflicts with existing scala
// error: Scala module 2.13.4 requires Jackson Databind version >= 2.13.0 and < 2.14.0 -
// Found jackson-databind version 2.14.2
exclude ("com.fasterxml.jackson.core", "jackson-databind"),
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.mockito" %% "mockito-scala" % "1.16.42" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test"),
// Assembly settings
// the sbt assembly plugin found multiple copies of the module-info.class file with
// different contents in the jars that it was merging flintCore dependencies.
Expand Down
5 changes: 5 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ CREATE MATERIALIZED VIEW [IF NOT EXISTS] name
AS <query>
WITH ( options )

REFRESH MATERIALIZED VIEW name

SHOW MATERIALIZED [VIEW|VIEWS] IN catalog[.database]

[DESC|DESCRIBE] MATERIALIZED VIEW name
Expand All @@ -213,6 +215,8 @@ SELECT
FROM alb_logs
GROUP BY TUMBLE(time, '1 Minute')

REFRESH MATERIALIZED VIEW alb_logs_metrics

SHOW MATERIALIZED VIEWS IN spark_catalog.default

DESC MATERIALIZED VIEW alb_logs_metrics
Expand Down Expand Up @@ -341,6 +345,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public interface FlintClient {
* @return {@link FlintWriter}
*/
FlintWriter createWriter(String indexName);

/**
* Create {@link RestHighLevelClient}.
* @return {@link RestHighLevelClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.opensearch.flint.core.storage;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.flint.core.FlintClient;
import org.opensearch.flint.core.FlintClientBuilder;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;

public class OpenSearchUpdater {
private final String indexName;

private final FlintClient flintClient;


public OpenSearchUpdater(String indexName, FlintClient flintClient) {
this.indexName = indexName;
this.flintClient = flintClient;
}

public void upsert(String id, String doc) {
// we might need to keep the updater for a long time. Reusing the client may not work as the temporary
// credentials may expire.
// also, failure to close the client causes the job to be stuck in the running state as the client resource
// is not released.
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.docAsUpsert(true);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}

public void update(String id, String doc) {
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}

public void updateIf(String id, String doc, long seqNo, long primaryTerm) {
try (RestHighLevelClient client = flintClient.createClient()) {
UpdateRequest
updateRequest =
new UpdateRequest(indexName, id).doc(doc, XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.setIfSeqNo(seqNo)
.setIfPrimaryTerm(primaryTerm);
client.update(updateRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
throw new RuntimeException(String.format(
"Failed to execute update request on index: %s, id: %s",
indexName,
id), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ dropCoveringIndexStatement

materializedViewStatement
: createMaterializedViewStatement
| refreshMaterializedViewStatement
| showMaterializedViewStatement
| describeMaterializedViewStatement
| dropMaterializedViewStatement
Expand All @@ -92,6 +93,10 @@ createMaterializedViewStatement
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshMaterializedViewStatement
: REFRESH MATERIALIZED VIEW mvName=multipartIdentifier
;

showMaterializedViewStatement
: SHOW MATERIALIZED (VIEW | VIEWS) IN catalogDb=multipartIdentifier
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ object FlintSparkConf {
val HYBRID_SCAN_ENABLED = FlintConfig("spark.flint.index.hybridscan.enabled")
.doc("Enable hybrid scan to include latest source data not refreshed to index yet")
.createWithDefault("false")

val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")
}

/**
Expand All @@ -137,6 +141,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean

def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization

class FlintCommand(
var state: String,
val query: String,
// statementId is the statement type doc id
val statementId: String,
val queryId: String,
val submitTime: Long,
var error: Option[String] = None) {
def running(): Unit = {
state = "running"
}

def complete(): Unit = {
state = "success"
}

def fail(): Unit = {
state = "failed"
}

def isRunning(): Boolean = {
state == "running"
}

def isComplete(): Boolean = {
state == "success"
}

def isFailed(): Boolean = {
state == "failed"
}
}

object FlintCommand {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(command: String): FlintCommand = {
val meta = parse(command)
val state = (meta \ "state").extract[String]
val query = (meta \ "query").extract[String]
val statementId = (meta \ "statementId").extract[String]
val queryId = (meta \ "queryId").extract[String]
val submitTime = (meta \ "submitTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintCommand(state, query, statementId, queryId, submitTime, maybeError)
}

def serialize(flintCommand: FlintCommand): String = {
// we only need to modify state and error
Serialization.write(
Map("state" -> flintCommand.state, "error" -> flintCommand.error.getOrElse("")))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.app

import org.json4s.{Formats, NoTypeHints}
import org.json4s.JsonAST.JString
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.index.seqno.SequenceNumbers

// lastUpdateTime is added to FlintInstance to track the last update time of the instance. Its unit is millisecond.
class FlintInstance(
val applicationId: String,
val jobId: String,
// sessionId is the session type doc id
val sessionId: String,
val state: String,
val lastUpdateTime: Long,
val error: Option[String] = None) {}

object FlintInstance {

implicit val formats: Formats = Serialization.formats(NoTypeHints)

def deserialize(job: String): FlintInstance = {
val meta = parse(job)
val applicationId = (meta \ "applicationId").extract[String]
val state = (meta \ "state").extract[String]
val jobId = (meta \ "jobId").extract[String]
val sessionId = (meta \ "sessionId").extract[String]
val lastUpdateTime = (meta \ "lastUpdateTime").extract[Long]
val maybeError: Option[String] = (meta \ "error") match {
case JString(str) => Some(str)
case _ => None
}

new FlintInstance(applicationId, jobId, sessionId, state, lastUpdateTime, maybeError)
}

def serialize(job: FlintInstance): String = {
Serialization.write(
Map(
"type" -> "session",
"sessionId" -> job.sessionId,
"error" -> job.error.getOrElse(""),
"applicationId" -> job.applicationId,
"jobId" -> job.jobId,
"state" -> job.state,
// update last update time
"lastUpdateTime" -> System.currentTimeMillis()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}

/**
Expand Down Expand Up @@ -247,7 +247,13 @@ class FlintSpark(val spark: SparkSession) {
}

def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = {
checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream)
checkpointLocation match {
case Some(location) => dataStream.option("checkpointLocation", location)
case None if flintSparkConf.isCheckpointMandatory =>
throw new IllegalStateException(
s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled")
case _ => dataStream
}
}

def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ object FlintSparkIndex {
}

def generateSchemaJSON(allFieldTypes: Map[String, String]): String = {
// Backtick column names to escape special characters, otherwise fromDDL() will fail
val catalogDDL =
allFieldTypes
.map { case (colName, colType) => s"$colName $colType not null" }
.map { case (colName, colType) => s"`$colName` $colType not null" }
.mkString(",")

val structType = StructType.fromDDL(catalogDDL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser._

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.apache.spark.sql.types.StringType

/**
Expand All @@ -24,7 +25,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
self: SparkSqlAstBuilder =>

override def visitCreateMaterializedViewStatement(
ctx: CreateMaterializedViewStatementContext): AnyRef = {
ctx: CreateMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val mvName = getFullTableName(flint, ctx.mvName)
val query = getSqlText(ctx.query)
Expand All @@ -49,8 +50,17 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}
}

override def visitRefreshMaterializedViewStatement(
ctx: RefreshMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val flintIndexName = getFlintIndexName(flint, ctx.mvName)
flint.refreshIndex(flintIndexName, RefreshMode.FULL)
Seq.empty
}
}

override def visitShowMaterializedViewStatement(
ctx: ShowMaterializedViewStatementContext): AnyRef = {
ctx: ShowMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("materialized_view_name", StringType, nullable = false)())

Expand All @@ -66,7 +76,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDescribeMaterializedViewStatement(
ctx: DescribeMaterializedViewStatementContext): AnyRef = {
ctx: DescribeMaterializedViewStatementContext): Command = {
val outputSchema = Seq(
AttributeReference("output_col_name", StringType, nullable = false)(),
AttributeReference("data_type", StringType, nullable = false)())
Expand All @@ -85,7 +95,7 @@ trait FlintSparkMaterializedViewAstBuilder extends FlintSparkSqlExtensionsVisito
}

override def visitDropMaterializedViewStatement(
ctx: DropMaterializedViewStatementContext): AnyRef = {
ctx: DropMaterializedViewStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
flint.deleteIndex(getFlintIndexName(flint, ctx.mvName))
Seq.empty
Expand Down
Loading

0 comments on commit 728d83b

Please sign in to comment.