diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java new file mode 100644 index 000000000..34f5dd257 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import com.amazonaws.services.s3.model.AmazonS3Exception; + +/** + * Define constants for common error messages and utility methods to handle them. + */ +public class ExceptionMessages { + public static final String SyntaxErrorPrefix = "Syntax error"; + public static final String S3ErrorPrefix = "Fail to read data from S3. Cause"; + public static final String GlueErrorPrefix = "Fail to read data from Glue. Cause"; + public static final String QueryAnalysisErrorPrefix = "Fail to analyze query. Cause"; + public static final String SparkExceptionErrorPrefix = "Spark exception. Cause"; + public static final String QueryRunErrorPrefix = "Fail to run query. Cause"; + public static final String GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions."; + + /** + * Extracts the root cause from a throwable and returns a sanitized error message. + * + * @param e The throwable from which to extract the root cause. + * @param maxLength The maximum length of the returned error message. Default is 1000 characters. + * @return A sanitized and possibly truncated error message string. + */ + public static String extractRootCause(Throwable e, int maxLength) { + return truncateMessage(redactMessage(rootCause(e)), maxLength); + } + + /** + * Overloads extractRootCause to provide a non-truncated error message. + * + * @param e The throwable from which to extract the root cause. + * @return A sanitized error message string. + */ + public static String extractRootCause(Throwable e) { + return redactMessage(rootCause(e)); + } + + private static Throwable rootCause(Throwable e) { + Throwable cause = e; + while (cause.getCause() != null && cause.getCause() != cause) { + cause = cause.getCause(); + } + return cause; + } + + private static String redactMessage(Throwable cause) { + if (cause instanceof AmazonS3Exception) { + AmazonS3Exception e = (AmazonS3Exception) cause; + return String.format("%s: serviceName=[%s], statusCode=[%d]", + S3ErrorPrefix, e.getServiceName(), e.getStatusCode()); + } else { + if (cause.getLocalizedMessage() != null) { + return cause.getLocalizedMessage(); + } else if (cause.getMessage() != null) { + return cause.getMessage(); + } else { + return cause.toString(); + } + } + } + + private static String truncateMessage(String message, int maxLength) { + if (message.length() > maxLength) { + return message.substring(0, maxLength) + "..."; + } else { + return message; + } + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala deleted file mode 100644 index caa6b73ab..000000000 --- a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.logging - -// Define constants for common error messages -object ExceptionMessages { - val SyntaxErrorPrefix = "Syntax error" - val S3ErrorPrefix = "Fail to read data from S3. Cause" - val GlueErrorPrefix = "Fail to read data from Glue. Cause" - val QueryAnalysisErrorPrefix = "Fail to analyze query. Cause" - val SparkExceptionErrorPrefix = "Spark exception. Cause" - val QueryRunErrorPrefix = "Fail to run query. Cause" - val GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions." -} diff --git a/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java b/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java new file mode 100644 index 000000000..b074be066 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import static org.junit.Assert.assertEquals; +import static org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.spark.SparkException; +import org.junit.Test; +import org.opensearch.OpenSearchException; + +public class ExceptionMessagesTest { + + @Test + public void testExtractRootCauseFromS3Exception() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setServiceName("S3"); + exception.setStatusCode(400); + + assertEquals( + "Fail to read data from S3. Cause: serviceName=[S3], statusCode=[400]", + extractRootCause( + new SparkException("test", exception))); + } + + @Test + public void testExtractRootCauseFromOtherException() { + OpenSearchException exception = new OpenSearchException("Write is blocked"); + + assertEquals( + "Write is blocked", + extractRootCause( + new SparkException("test", exception))); + } + + @Test + public void testExtractRootCauseFromOtherExceptionWithLongMessage() { + OpenSearchException exception = new OpenSearchException("Write is blocked due to cluster is readonly"); + + assertEquals( + "Write is blocked due...", + extractRootCause( + new SparkException("test", exception), 20)); + } +} \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 343299a8c..29acaea6b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging @@ -118,7 +119,7 @@ class FlintSparkIndexMonitor( } catch { case e: Throwable => logError(s"Streaming job $name terminated with exception: ${e.getMessage}") - retryUpdateIndexStateToFailed(name) + retryUpdateIndexStateToFailed(name, exception = Some(e)) } } else { logInfo(s"Index monitor for [$indexName] not found.") @@ -132,7 +133,7 @@ class FlintSparkIndexMonitor( val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption if (name.isDefined) { logInfo(s"Found index name in index monitor task list: ${name.get}") - retryUpdateIndexStateToFailed(name.get) + retryUpdateIndexStateToFailed(name.get, exception = None) } else { logInfo(s"Index monitor task list is empty") } @@ -195,19 +196,24 @@ class FlintSparkIndexMonitor( /** * Transition the index state to FAILED upon encountering an exception. Retry in case conflicts * with final transaction in scheduled task. - * ``` - * TODO: - * 1) Determine the appropriate state code based on the type of exception encountered - * 2) Record and persist the error message of the root cause for further diagnostics. - * ``` + * + * TODO: Determine the appropriate state code based on the type of exception encountered */ - private def retryUpdateIndexStateToFailed(indexName: String): Unit = { + private def retryUpdateIndexStateToFailed( + indexName: String, + exception: Option[Throwable]): Unit = { logInfo(s"Updating index state to failed for $indexName") retry { flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest.copy(state = FAILED)) + .finalLog(latest => + exception match { + case Some(ex) => + latest.copy(state = FAILED, error = extractRootCause(ex, 1000)) + case None => + latest.copy(state = FAILED) + }) .commit(_ => {}) } } diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 1e2d68b8e..ad5029fcb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -163,7 +163,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc latestLog should contain("state" -> "refreshing") } - test("await monitor terminated with exception should update index state to failed") { + test("await monitor terminated with exception should update index state to failed with error") { new Thread(() => { Thread.sleep(3000L) @@ -186,6 +186,8 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Assert index state is active now val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "failed") + latestLog("error").asInstanceOf[String] should (include("OpenSearchException") and + include("type=cluster_block_exception")) } test( @@ -199,6 +201,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Assert index state is active now val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "failed") + latestLog should not contain "error" } private def getLatestTimestamp: (Long, Long) = {