From 3c8a49041a69334f712c53b223a794d48acab084 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Thu, 25 Jul 2024 17:01:04 -0700 Subject: [PATCH] [Bugfix] Insights on query execution error (#475) * BugFix: Add error logs Signed-off-by: Louis Chu * Add IT Signed-off-by: Louis Chu * Fix IT Signed-off-by: Louis Chu * Log stacktrace Signed-off-by: Louis Chu * Use full msg instead of prefix Signed-off-by: Louis Chu --------- Signed-off-by: Louis Chu --- .../apache/spark/sql/FlintREPLITSuite.scala | 63 +++++++++++++++++++ .../apache/spark/sql/FlintJobExecutor.scala | 22 ++++--- 2 files changed, 75 insertions(+), 10 deletions(-) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala index 1c0b27674..921db792a 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintREPLITSuite.scala @@ -422,6 +422,69 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest { } } + test("create table with dummy location should fail with excepted error message") { + try { + createSession(jobRunId, "") + threadLocalFuture.set(startREPL()) + + val dummyLocation = "s3://path/to/dummy/location" + val testQueryId = "110" + val createTableStatement = + s""" + | CREATE TABLE $testTable + | ( + | name STRING, + | age INT + | ) + | USING CSV + | LOCATION '$dummyLocation' + | OPTIONS ( + | header 'false', + | delimiter '\\t' + | ) + |""".stripMargin + val createTableStatementId = + submitQuery(s"${makeJsonCompliant(createTableStatement)}", testQueryId) + + val createTableStatementValidation: REPLResult => Boolean = result => { + assert( + result.results.size == 0, + s"expected result size is 0, but got ${result.results.size}") + assert( + result.schemas.size == 0, + s"expected schema size is 0, but got ${result.schemas.size}") + failureValidation(result) + true + } + pollForResultAndAssert(createTableStatementValidation, testQueryId) + assert( + !awaitConditionForStatementOrTimeout( + statement => { + statement.error match { + case Some(error) + if error == """{"Message":"Fail to run query. Cause: No FileSystem for scheme \"s3\""}""" => + // Assertion passed + case _ => + fail(s"Statement error is: ${statement.error}") + } + statement.state == "failed" + }, + createTableStatementId), + s"Fail to verify for $createTableStatementId.") + // clean up + val dropStatement = + s"""DROP TABLE $testTable""".stripMargin + submitQuery(s"${makeJsonCompliant(dropStatement)}", "999") + } catch { + case e: Exception => + logError("Unexpected exception", e) + assert(false, "Unexpected exception") + } finally { + waitREPLStop(threadLocalFuture.get()) + threadLocalFuture.remove() + } + } + /** * JSON does not support raw newlines (\n) in string values. All newlines must be escaped or * removed when inside a JSON string. The same goes for tab characters, which should be diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index f38a27ef4..00f023694 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -436,18 +436,22 @@ trait FlintJobExecutor { private def handleQueryException( e: Exception, - message: String, + messagePrefix: String, errorSource: Option[String] = None, statusCode: Option[Int] = None): String = { - - val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++ + val errorMessage = s"$messagePrefix: ${e.getMessage}" + val errorDetails = Map("Message" -> errorMessage) ++ errorSource.map("ErrorSource" -> _) ++ statusCode.map(code => "StatusCode" -> code.toString) val errorJson = mapper.writeValueAsString(errorDetails) - statusCode.foreach { code => - CustomLogging.logError(new OperationMessage("", code), e) + // CustomLogging will call log4j logger.error() underneath + statusCode match { + case Some(code) => + CustomLogging.logError(new OperationMessage(errorMessage, code), e) + case None => + CustomLogging.logError(errorMessage, e) } errorJson @@ -491,16 +495,14 @@ trait FlintJobExecutor { case r: SparkException => handleQueryException(r, ExceptionMessages.SparkExceptionErrorPrefix) case r: Exception => - val rootCauseClassName = ex.getClass.getName - val errMsg = ex.getMessage - logDebug(s"Root cause class name: $rootCauseClassName") - logDebug(s"Root cause error message: $errMsg") + val rootCauseClassName = r.getClass.getName + val errMsg = r.getMessage if (rootCauseClassName == "org.apache.hadoop.hive.metastore.api.MetaException" && errMsg.contains("com.amazonaws.services.glue.model.AccessDeniedException")) { val e = new SecurityException(ExceptionMessages.GlueAccessDeniedMessage) handleQueryException(e, ExceptionMessages.QueryRunErrorPrefix) } else { - handleQueryException(ex, ExceptionMessages.QueryRunErrorPrefix) + handleQueryException(r, ExceptionMessages.QueryRunErrorPrefix) } } }