-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement FlintJob Logic for EMR-S #52
Conversation
This commit introduces FlintJob logic for EMR-S, mirroring the existing SQLJob implementation for EMR cluster. The key differences in FlintJob are: 1. It reads OpenSearch host information from spark command parameters. 2. It ensures the existence of a result index with the correct mapping in OpenSearch, creating it if necessary. This process occurs in parallel to SQL query execution. 3. It reports an error if the result index mapping is incorrect. 4. It saves a failure status if the SQL execution fails. Testing: 1. Manual testing was conducted using the EMR-S CLI. 2. New unit tests were added to verify the functionality. Signed-off-by: Kaituo Li <[email protected]>
StructField("jobRunId", StringType, nullable = true), | ||
StructField("applicationId", StringType, nullable = true), | ||
StructField("dataSourceName", StringType, nullable = true), | ||
StructField("status", StringType, nullable = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we update documentation with status as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
val data = executeQuery(spark, query, dataSource) | ||
|
||
val correctMapping = ThreadUtils.awaitResult(futureMappingCheck, Duration(10, MINUTES)) | ||
writeData(spark, data, resultIndex, correctMapping, dataSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we write errors from executingQuery back to the result index. This way we can propagate some information to the user when the Query Fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added errors in the result index
val spark = createSparkSession(conf) | ||
|
||
val threadPool = ThreadUtils.newDaemonFixedThreadPool(1, "check-create-index") | ||
implicit val executionContext = ExecutionContext.fromExecutor(threadPool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding: If we don't provide the above thread pool, I am assuming there will be a default threadPool? If yes, with how many threads?[No of Cores]. Does this overwrite the default threadpool.
I don't have much acquaintance with Scala but we are not using this variable down the file. Is this exceutionContext implicitly used when we submit a task to Future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the executionContext is implicitly used when submiting a task to Future. We have to use a new threadpool. Using global threadpool is gonna fail scala fmt. This does not overwrite the default threadpool. It creates a new one.
} | ||
val data = executeQuery(spark, query, dataSource) | ||
|
||
val correctMapping = ThreadUtils.awaitResult(futureMappingCheck, Duration(10, MINUTES)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we reduce this to a minute? 10 Minutes is too longer to wait for an opensearch mapping query? Opensearch itself times out after 60 seconds???
I agree with the approach. But, do we really gain substantially but parallelizing the mapping creation part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduced to 1 minute.
not sure how much we gain. but we are trying to contain the query under 10~20 seconds. Every few seconds saved is good.
build.sbt
Outdated
@@ -114,6 +114,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration")) | |||
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test", | |||
"com.github.sbt" % "junit-interface" % "0.13.3" % "test"), | |||
libraryDependencies ++= deps(sparkVersion), | |||
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is it for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala
Show resolved
Hide resolved
spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala
Show resolved
Hide resolved
"""{ | ||
"dynamic": false, | ||
"properties": { | ||
"result": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result include plain json. no search requirement.
using "enabled": false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added "enabled": false
} | ||
} | ||
}, | ||
"schema": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schema include plain json. no search requirement.
using "enabled": false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added "enabled": false
val inputJson = Json.parse(input) | ||
val mappingJson = Json.parse(mapping) | ||
logInfo(s"inputJson $inputJson") | ||
logInfo(s"mappingJson $mappingJson") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log mapping may has security concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) | ||
} | ||
|
||
def isSuperset(input: String, mapping: String): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not simply check exactly match?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as discussed offline, there is a strange issue of getIndexMetadata output where boolean is converted to String, but integer stays as integers. Will keep current code.
"_index" : ".query_execution_result", | ||
"_id" : "A2WOsYgBMUoqCqlDJHrn", | ||
"_score" : 1.0, | ||
"_source" : { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing status field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
throw e | ||
} finally { | ||
// Stop SparkSession | ||
spark.stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create Skipping Index is Spark Structured Streaming query. Main thread should wait. In case plugin will set EMR-S JobTimeout = 0 for long streaming query, then plugin can pass isStreaming parameters to FlintJob also.
if (isStreamingQuery) {
spark.streams.awaitAnyTermination()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For your reference: #53
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plugin related change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
val input: DataFrame = | ||
spark.createDataFrame(spark.sparkContext.parallelize(inputRows), inputSchema) | ||
|
||
test("Test getFormattedData method") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests for explain and ddl
- explain:
explain select * from table
- ddl:
create table ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do it later to unblock testing
Signed-off-by: Kaituo Li <[email protected]>
Description
This commit introduces FlintJob logic for EMR-S, mirroring the existing SQLJob implementation for EMR cluster. The key differences in FlintJob are:
Testing:
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.