Skip to content

Commit

Permalink
Merge branch 'main' into ppl-projection-command
Browse files Browse the repository at this point in the history
  • Loading branch information
YANG-DB committed Nov 26, 2024
2 parents b8e02fc + 3ff2ef2 commit 11900d9
Show file tree
Hide file tree
Showing 18 changed files with 341 additions and 41 deletions.
28 changes: 24 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
import Dependencies._
import sbtassembly.AssemblyPlugin.autoImport.ShadeRule
import Dependencies.*

lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.5.1"
Expand Down Expand Up @@ -38,6 +37,11 @@ ThisBuild / scalastyleConfig := baseDirectory.value / "scalastyle-config.xml"
*/
ThisBuild / Test / parallelExecution := false

/**
* Set the parallelism of forked tests to 4 to accelerate integration test
*/
concurrentRestrictions in Global := Seq(Tags.limit(Tags.ForkedTestGroup, 4))

// Run as part of compile task.
lazy val compileScalastyle = taskKey[Unit]("compileScalastyle")

Expand Down Expand Up @@ -274,13 +278,29 @@ lazy val integtest = (project in file("integ-test"))
IntegrationTest / javaSource := baseDirectory.value / "src/integration/java",
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / resourceDirectory := baseDirectory.value / "src/integration/resources",
IntegrationTest / parallelExecution := false,
IntegrationTest / parallelExecution := true, // enable parallel execution
IntegrationTest / testForkedParallel := false, // disable forked parallel execution to avoid duplicate spark context in the same JVM
IntegrationTest / fork := true,
IntegrationTest / testGrouping := {
val tests = (IntegrationTest / definedTests).value
val forkOptions = ForkOptions()
val groups = tests.grouped(tests.size / 4 + 1).zipWithIndex.map { case (group, index) =>
val groupName = s"group-${index + 1}"
new Tests.Group(
name = groupName,
tests = group,
runPolicy = Tests.SubProcess(
forkOptions.withRunJVMOptions(forkOptions.runJVMOptions ++
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/integ-test/target/tmp/$groupName")))
)
}
groups.toSeq
}
)),
inConfig(AwsIntegrationTest)(Defaults.testSettings ++ Seq(
AwsIntegrationTest / javaSource := baseDirectory.value / "src/aws-integration/java",
AwsIntegrationTest / scalaSource := baseDirectory.value / "src/aws-integration/scala",
AwsIntegrationTest / parallelExecution := false,
AwsIntegrationTest / parallelExecution := true,
AwsIntegrationTest / fork := true,
)),
libraryDependencies ++= Seq(
Expand Down
1 change: 1 addition & 0 deletions docs/ppl-lang/PPL-Example-Commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ Assumptions: `a`, `b`, `c`, `d`, `e` are existing fields in `table`
Assumptions: `bridges`, `coor` are existing fields in `table`, and the field's types are `struct<?,?>` or `array<struct<?,?>>`
- `source = table | flatten bridges`
- `source = table | flatten coor`
- `source = table | flatten coor as (altitude, latitude, longitude)`
- `source = table | flatten bridges | flatten coor`
- `source = table | fields bridges | flatten bridges`
- `source = table | fields country, bridges | flatten bridges | fields country, length | stats avg(length) as avg by country`
Expand Down
19 changes: 17 additions & 2 deletions docs/ppl-lang/ppl-flatten-command.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ Using `flatten` command to flatten a field of type:


### Syntax
`flatten <field>`
`flatten <field> [As aliasSequence]`

* field: to be flattened. The field must be of supported type.
* aliasSequence: to be used as aliasSequence for the flattened-output fields. Better to put the aliasSequence in brace if there is more than one field.

### Test table
#### Schema
Expand Down Expand Up @@ -87,4 +88,18 @@ PPL query:
| 2024-09-13T12:00:00 | Prague | Czech Republic| 343 | Legion Bridge | 200 | 50.0755| 14.4378|
| 2024-09-13T12:00:00 | Budapest| Hungary | 375 | Chain Bridge | 96 | 47.4979| 19.0402|
| 2024-09-13T12:00:00 | Budapest| Hungary | 333 | Liberty Bridge | 96 | 47.4979| 19.0402|
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |
| 1990-09-13T12:00:00 | Warsaw | Poland | NULL | NULL | NULL | NULL | NULL |

### Example 4: flatten with aliasSequence
This example shows how to flatten with aliasSequence.
PPL query:
- `source=table | flatten coor as (altitude, latitude, longitude)`

| \_time | bridges | city | country | altitude | latitude | longtitude |
|---------------------|----------------------------------------------|---------|---------------|----------|----------|------------|
| 2024-09-13T12:00:00 | [{801, Tower Bridge}, {928, London Bridge}] | London | England | 35 | 51.5074 | -0.1278 |
| 2024-09-13T12:00:00 | [{232, Pont Neuf}, {160, Pont Alexandre III}]| Paris | France | 35 | 48.8566 | 2.3522 |
| 2024-09-13T12:00:00 | [{48, Rialto Bridge}, {11, Bridge of Sighs}] | Venice | Italy | 2 | 45.4408 | 12.3155 |
| 2024-09-13T12:00:00 | [{516, Charles Bridge}, {343, Legion Bridge}]| Prague | Czech Republic| 200 | 50.0755 | 14.4378 |
| 2024-09-13T12:00:00 | [{375, Chain Bridge}, {333, Liberty Bridge}] | Budapest| Hungary | 96 | 47.4979 | 19.0402 |
| 1990-09-13T12:00:00 | NULL | Warsaw | Poland | NULL | NULL | NULL |
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ static void recordLatency(String metricNamePrefix, long latencyMilliseconds) {
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
* @param t the exception encountered during the operation, used to determine the type of failure
*/
static void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
static void recordOperationFailure(String metricNamePrefix, Throwable t) {
OpenSearchException openSearchException = extractOpenSearchException(t);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;
if (openSearchException != null) {
CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException);
} else {
CustomLogging.logError("OpenSearch Operation failed with an exception.", e);
CustomLogging.logError("OpenSearch Operation failed with an exception.", t);
}
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ object FlintDataType {
// objects
case st: StructType => serializeJValue(st)

// Serialize maps as empty objects and let the map entries automap
case mt: MapType => serializeJValue(new StructType())

// array
case ArrayType(elementType, _) => serializeField(elementType, Metadata.empty)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,10 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
private def isSchedulerModeChanged(
originalOptions: FlintSparkIndexOptions,
updatedOptions: FlintSparkIndexOptions): Boolean = {
// Altering from manual to auto should not be interpreted as a scheduling mode change.
if (!originalOptions.options.contains(SCHEDULER_MODE.toString)) {
return false
}
updatedOptions.isExternalSchedulerEnabled() != originalOptions.isExternalSchedulerEnabled()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.{EXTERNAL_SCHEDULER_ENABLED, HYBRID_SCAN_ENABLED, METADATA_CACHE_WRITE}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.test.SharedSparkSession

trait FlintSuite extends SharedSparkSession {
Expand All @@ -30,6 +31,7 @@ trait FlintSuite extends SharedSparkSession {
.set(
FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key,
"org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest")
.set(WAREHOUSE_PATH.key, s"spark-warehouse/${suiteName}")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ class FlintDataTypeSuite extends FlintSuite with Matchers {
|}""".stripMargin)
}

test("spark map type serialize") {
val sparkStructType = StructType(
StructField("mapField", MapType(StringType, StringType), true) ::
Nil)

FlintDataType.serialize(sparkStructType) shouldBe compactJson("""{
| "properties": {
| "mapField": {
| "properties": {
| }
| }
| }
|}""".stripMargin)
}

test("spark varchar and char type serialize") {
val flintDataType = """{
| "properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,36 +81,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}
}

def createJobOperator(query: String, jobRunId: String): JobOperator = {
val streamingRunningCount = new AtomicInteger(0)

/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

val job = JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
job
}

def startJob(query: String, jobRunId: String): Future[Unit] = {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
val streamingRunningCount = new AtomicInteger(0)

val futureResult = Future {
/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

/**
* FlintJob.main() is not called because we need to manually set these variables within a
* JobOperator instance to accommodate specific runtime requirements.
*/
val job =
JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
val job = createJobOperator(query, jobRunId)
job.start()
}
futureResult.onComplete {
Expand Down Expand Up @@ -291,6 +297,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}

test("create skipping index with non-existent table") {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

val query =
s"""
| CREATE SKIPPING INDEX ON testTable
Expand All @@ -303,7 +313,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
| """.stripMargin
val queryStartTime = System.currentTimeMillis()
val jobRunId = "00ff4o3b5091080r"
threadLocalFuture.set(startJob(query, jobRunId))

val job = createJobOperator(query, jobRunId)
threadLocalFuture.set(Future(job.start()))

val validation: REPLResult => Boolean = result => {
assert(
Expand All @@ -315,6 +327,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {

assert(result.status == "FAILED", s"expected status is FAILED, but got ${result.status}")
assert(!result.error.isEmpty, s"we expect error, but got ${result.error}")
assert(
job.throwableHandler.error.contains("Table spark_catalog.default.testTable is not found"),
"Expected error message to mention 'spark_catalog.default.testTable is not found'")
commonAssert(result, jobRunId, query, queryStartTime)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,44 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
flint.queryIndex(testIndex).collect().toSet should have size 2
}

test("update full refresh index to auto refresh should start job with external scheduler") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create full refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(FlintSparkIndexOptions(Map("auto_refresh" -> "false")), testIndex)
.create()

spark.streams.active.find(_.name == testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 0
val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))

val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe empty
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some(
FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL)

verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update incremental refresh index to auto refresh should start job") {
withTempDir { checkpointDir =>
// Create incremental refresh Flint index and wait for complete
Expand Down Expand Up @@ -667,6 +705,51 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
}
}

test(
"update incremental refresh index to auto refresh should start job with external scheduler") {
setFlintSparkConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED, "true")

withTempDir { checkpointDir =>
// Create incremental refresh Flint index
flint
.skippingIndex()
.onTable(testTable)
.addPartitions("year", "month")
.options(
FlintSparkIndexOptions(
Map(
"incremental_refresh" -> "true",
"checkpoint_location" -> checkpointDir.getAbsolutePath)),
testIndex)
.create()

spark.streams.active.find(_.name == testIndex) shouldBe empty
flint.queryIndex(testIndex).collect().toSet should have size 0
val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

val updatedIndex = flint
.skippingIndex()
.copyWithUpdate(
indexInitial,
FlintSparkIndexOptions(
Map(
"auto_refresh" -> "true",
"incremental_refresh" -> "false",
"checkpoint_location" -> checkpointDir.getAbsolutePath)))

val jobId = flint.updateIndex(updatedIndex)
jobId shouldBe empty
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some(
FlintOptions.DEFAULT_EXTERNAL_SCHEDULER_INTERVAL)

verifySchedulerIndex(testIndex, 5, "MINUTES")
}
}

test("update auto refresh index to full refresh should stop job") {
// Create auto refresh Flint index and wait for complete
flint
Expand Down
Loading

0 comments on commit 11900d9

Please sign in to comment.