Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.6] [Bugfix]Record statement execution error #941

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,36 +81,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}
}

def createJobOperator(query: String, jobRunId: String): JobOperator = {
val streamingRunningCount = new AtomicInteger(0)

/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

val job = JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
job
}

def startJob(query: String, jobRunId: String): Future[Unit] = {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)
val streamingRunningCount = new AtomicInteger(0)

val futureResult = Future {
/*
* Because we cannot test from FlintJob.main() for the reason below, we have to configure
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING)

/**
* FlintJob.main() is not called because we need to manually set these variables within a
* JobOperator instance to accommodate specific runtime requirements.
*/
val job =
JobOperator(
appId,
jobRunId,
spark,
query,
queryId,
dataSourceName,
resultIndex,
FlintJobType.STREAMING,
streamingRunningCount)
job.terminateJVM = false
val job = createJobOperator(query, jobRunId)
job.start()
}
futureResult.onComplete {
Expand Down Expand Up @@ -291,6 +297,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
}

test("create skipping index with non-existent table") {
val prefix = "flint-job-test"
val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1)
implicit val executionContext = ExecutionContext.fromExecutor(threadPool)

val query =
s"""
| CREATE SKIPPING INDEX ON testTable
Expand All @@ -303,7 +313,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
| """.stripMargin
val queryStartTime = System.currentTimeMillis()
val jobRunId = "00ff4o3b5091080r"
threadLocalFuture.set(startJob(query, jobRunId))

val job = createJobOperator(query, jobRunId)
threadLocalFuture.set(Future(job.start()))

val validation: REPLResult => Boolean = result => {
assert(
Expand All @@ -315,6 +327,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {

assert(result.status == "FAILED", s"expected status is FAILED, but got ${result.status}")
assert(!result.error.isEmpty, s"we expect error, but got ${result.error}")
assert(
job.throwableHandler.error.contains("Table spark_catalog.default.testTable is not found"),
"Expected error message to mention 'spark_catalog.default.testTable is not found'")
commonAssert(result, jobRunId, query, queryStartTime)
true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ trait FlintJobExecutor {
statusCode.foreach(code => errorDetails.put("StatusCode", code.toString))

val errorJson = mapper.writeValueAsString(errorDetails)

// Record the processed error message
throwableHandler.setError(errorJson)
// CustomLogging will call log4j logger.error() underneath
statusCode match {
case Some(code) =>
Expand Down
Loading