Skip to content

Commit

Permalink
[Bugfix] Insights on query execution error (opensearch-project#475)
Browse files Browse the repository at this point in the history
* BugFix: Add error logs

Signed-off-by: Louis Chu <[email protected]>

* Add IT

Signed-off-by: Louis Chu <[email protected]>

* Fix IT

Signed-off-by: Louis Chu <[email protected]>

* Log stacktrace

Signed-off-by: Louis Chu <[email protected]>

* Use full msg instead of prefix

Signed-off-by: Louis Chu <[email protected]>

---------

Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Jul 26, 2024
1 parent 538dd54 commit 3c8a490
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,69 @@ class FlintREPLITSuite extends SparkFunSuite with OpenSearchSuite with JobTest {
}
}

test("create table with dummy location should fail with excepted error message") {
try {
createSession(jobRunId, "")
threadLocalFuture.set(startREPL())

val dummyLocation = "s3://path/to/dummy/location"
val testQueryId = "110"
val createTableStatement =
s"""
| CREATE TABLE $testTable
| (
| name STRING,
| age INT
| )
| USING CSV
| LOCATION '$dummyLocation'
| OPTIONS (
| header 'false',
| delimiter '\\t'
| )
|""".stripMargin
val createTableStatementId =
submitQuery(s"${makeJsonCompliant(createTableStatement)}", testQueryId)

val createTableStatementValidation: REPLResult => Boolean = result => {
assert(
result.results.size == 0,
s"expected result size is 0, but got ${result.results.size}")
assert(
result.schemas.size == 0,
s"expected schema size is 0, but got ${result.schemas.size}")
failureValidation(result)
true
}
pollForResultAndAssert(createTableStatementValidation, testQueryId)
assert(
!awaitConditionForStatementOrTimeout(
statement => {
statement.error match {
case Some(error)
if error == """{"Message":"Fail to run query. Cause: No FileSystem for scheme \"s3\""}""" =>
// Assertion passed
case _ =>
fail(s"Statement error is: ${statement.error}")
}
statement.state == "failed"
},
createTableStatementId),
s"Fail to verify for $createTableStatementId.")
// clean up
val dropStatement =
s"""DROP TABLE $testTable""".stripMargin
submitQuery(s"${makeJsonCompliant(dropStatement)}", "999")
} catch {
case e: Exception =>
logError("Unexpected exception", e)
assert(false, "Unexpected exception")
} finally {
waitREPLStop(threadLocalFuture.get())
threadLocalFuture.remove()
}
}

/**
* JSON does not support raw newlines (\n) in string values. All newlines must be escaped or
* removed when inside a JSON string. The same goes for tab characters, which should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,18 +436,22 @@ trait FlintJobExecutor {

private def handleQueryException(
e: Exception,
message: String,
messagePrefix: String,
errorSource: Option[String] = None,
statusCode: Option[Int] = None): String = {

val errorDetails = Map("Message" -> s"$message: ${e.getMessage}") ++
val errorMessage = s"$messagePrefix: ${e.getMessage}"
val errorDetails = Map("Message" -> errorMessage) ++
errorSource.map("ErrorSource" -> _) ++
statusCode.map(code => "StatusCode" -> code.toString)

val errorJson = mapper.writeValueAsString(errorDetails)

statusCode.foreach { code =>
CustomLogging.logError(new OperationMessage("", code), e)
// CustomLogging will call log4j logger.error() underneath
statusCode match {
case Some(code) =>
CustomLogging.logError(new OperationMessage(errorMessage, code), e)
case None =>
CustomLogging.logError(errorMessage, e)
}

errorJson
Expand Down Expand Up @@ -491,16 +495,14 @@ trait FlintJobExecutor {
case r: SparkException =>
handleQueryException(r, ExceptionMessages.SparkExceptionErrorPrefix)
case r: Exception =>
val rootCauseClassName = ex.getClass.getName
val errMsg = ex.getMessage
logDebug(s"Root cause class name: $rootCauseClassName")
logDebug(s"Root cause error message: $errMsg")
val rootCauseClassName = r.getClass.getName
val errMsg = r.getMessage
if (rootCauseClassName == "org.apache.hadoop.hive.metastore.api.MetaException" &&
errMsg.contains("com.amazonaws.services.glue.model.AccessDeniedException")) {
val e = new SecurityException(ExceptionMessages.GlueAccessDeniedMessage)
handleQueryException(e, ExceptionMessages.QueryRunErrorPrefix)
} else {
handleQueryException(ex, ExceptionMessages.QueryRunErrorPrefix)
handleQueryException(r, ExceptionMessages.QueryRunErrorPrefix)
}
}
}
Expand Down

0 comments on commit 3c8a490

Please sign in to comment.