From edea0decdffe6e56ba24289879e50844a4955cb6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Thu, 21 Nov 2024 04:10:02 +0000 Subject: [PATCH] Record statement execution error (#940) (cherry picked from commit a14013c2f93e7faf80a04bd2c1e21d440d57420d) Signed-off-by: github-actions[bot] --- .../apache/spark/sql/FlintJobITSuite.scala | 55 ++++++++++++------- .../apache/spark/sql/FlintJobExecutor.scala | 3 +- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala index 11bc7271c..81bf60f5e 100644 --- a/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/integration/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -81,36 +81,42 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } } + def createJobOperator(query: String, jobRunId: String): JobOperator = { + val streamingRunningCount = new AtomicInteger(0) + + /* + * Because we cannot test from FlintJob.main() for the reason below, we have to configure + * all Spark conf required by Flint code underlying manually. + */ + spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) + spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING) + + val job = JobOperator( + appId, + jobRunId, + spark, + query, + queryId, + dataSourceName, + resultIndex, + FlintJobType.STREAMING, + streamingRunningCount) + job.terminateJVM = false + job + } + def startJob(query: String, jobRunId: String): Future[Unit] = { val prefix = "flint-job-test" val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1) implicit val executionContext = ExecutionContext.fromExecutor(threadPool) - val streamingRunningCount = new AtomicInteger(0) val futureResult = Future { - /* - * Because we cannot test from FlintJob.main() for the reason below, we have to configure - * all Spark conf required by Flint code underlying manually. - */ - spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) - spark.conf.set(JOB_TYPE.key, FlintJobType.STREAMING) /** * FlintJob.main() is not called because we need to manually set these variables within a * JobOperator instance to accommodate specific runtime requirements. */ - val job = - JobOperator( - appId, - jobRunId, - spark, - query, - queryId, - dataSourceName, - resultIndex, - FlintJobType.STREAMING, - streamingRunningCount) - job.terminateJVM = false + val job = createJobOperator(query, jobRunId) job.start() } futureResult.onComplete { @@ -291,6 +297,10 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { } test("create skipping index with non-existent table") { + val prefix = "flint-job-test" + val threadPool = ThreadUtils.newDaemonThreadPoolScheduledExecutor(prefix, 1) + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val query = s""" | CREATE SKIPPING INDEX ON testTable @@ -303,7 +313,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { | """.stripMargin val queryStartTime = System.currentTimeMillis() val jobRunId = "00ff4o3b5091080r" - threadLocalFuture.set(startJob(query, jobRunId)) + + val job = createJobOperator(query, jobRunId) + threadLocalFuture.set(Future(job.start())) val validation: REPLResult => Boolean = result => { assert( @@ -315,6 +327,9 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { assert(result.status == "FAILED", s"expected status is FAILED, but got ${result.status}") assert(!result.error.isEmpty, s"we expect error, but got ${result.error}") + assert( + job.throwableHandler.error.contains("Table spark_catalog.default.testTable is not found"), + "Expected error message to mention 'spark_catalog.default.testTable is not found'") commonAssert(result, jobRunId, query, queryStartTime) true } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index 63c120a2c..a9bb6f5bb 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -450,7 +450,8 @@ trait FlintJobExecutor { statusCode.foreach(code => errorDetails.put("StatusCode", code.toString)) val errorJson = mapper.writeValueAsString(errorDetails) - + // Record the processed error message + throwableHandler.setError(errorJson) // CustomLogging will call log4j logger.error() underneath statusCode match { case Some(code) =>