From 957f7d3b485876e16db9497ddfb037f9a99f9101 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 15 Jul 2024 10:52:01 -0700 Subject: [PATCH 1/3] Store root cause message and add IT Signed-off-by: Chen Dai --- .../flint/spark/FlintSparkIndexMonitor.scala | 38 ++++++++++++++----- .../spark/FlintSparkIndexMonitorITSuite.scala | 5 ++- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 343299a8c..29f4f7ee0 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -118,7 +118,7 @@ class FlintSparkIndexMonitor( } catch { case e: Throwable => logError(s"Streaming job $name terminated with exception: ${e.getMessage}") - retryUpdateIndexStateToFailed(name) + retryUpdateIndexStateToFailed(name, exception = Some(e)) } } else { logInfo(s"Index monitor for [$indexName] not found.") @@ -132,7 +132,7 @@ class FlintSparkIndexMonitor( val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption if (name.isDefined) { logInfo(s"Found index name in index monitor task list: ${name.get}") - retryUpdateIndexStateToFailed(name.get) + retryUpdateIndexStateToFailed(name.get, exception = None) } else { logInfo(s"Index monitor task list is empty") } @@ -195,19 +195,24 @@ class FlintSparkIndexMonitor( /** * Transition the index state to FAILED upon encountering an exception. Retry in case conflicts * with final transaction in scheduled task. - * ``` - * TODO: - * 1) Determine the appropriate state code based on the type of exception encountered - * 2) Record and persist the error message of the root cause for further diagnostics. - * ``` + * + * TODO: Determine the appropriate state code based on the type of exception encountered */ - private def retryUpdateIndexStateToFailed(indexName: String): Unit = { + private def retryUpdateIndexStateToFailed( + indexName: String, + exception: Option[Throwable]): Unit = { logInfo(s"Updating index state to failed for $indexName") retry { flintMetadataLogService .startTransaction(indexName) .initialLog(latest => latest.state == REFRESHING) - .finalLog(latest => latest.copy(state = FAILED)) + .finalLog(latest => + exception match { + case Some(ex) => + latest.copy(state = FAILED, error = extractRootCause(ex)) + case None => + latest.copy(state = FAILED) + }) .commit(_ => {}) } } @@ -231,6 +236,21 @@ class FlintSparkIndexMonitor( override def run(): Unit = operation }) } + + private def extractRootCause(e: Throwable): String = { + var cause = e + while (cause.getCause != null && cause.getCause != cause) { + cause = cause.getCause + } + + if (cause.getLocalizedMessage != null) { + return cause.getLocalizedMessage + } + if (cause.getMessage != null) { + return cause.getMessage + } + cause.toString + } } object FlintSparkIndexMonitor extends Logging { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala index 1e2d68b8e..ad5029fcb 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkIndexMonitorITSuite.scala @@ -163,7 +163,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc latestLog should contain("state" -> "refreshing") } - test("await monitor terminated with exception should update index state to failed") { + test("await monitor terminated with exception should update index state to failed with error") { new Thread(() => { Thread.sleep(3000L) @@ -186,6 +186,8 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Assert index state is active now val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "failed") + latestLog("error").asInstanceOf[String] should (include("OpenSearchException") and + include("type=cluster_block_exception")) } test( @@ -199,6 +201,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc // Assert index state is active now val latestLog = latestLogEntry(testLatestId) latestLog should contain("state" -> "failed") + latestLog should not contain "error" } private def getLatestTimestamp: (Long, Long) = { From 094dd8c2df2825e09938b990a6ce70a67e9dac6c Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 Jul 2024 10:28:15 -0700 Subject: [PATCH 2/3] Move extract root cause to ErrorMessages Signed-off-by: Chen Dai --- .../core/logging/ExceptionMessages.scala | 45 +++++++++++++++++++ .../flint/spark/FlintSparkIndexMonitor.scala | 16 +------ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala index caa6b73ab..71bb729f5 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala @@ -5,6 +5,8 @@ package org.opensearch.flint.core.logging +import com.amazonaws.services.s3.model.AmazonS3Exception + // Define constants for common error messages object ExceptionMessages { val SyntaxErrorPrefix = "Syntax error" @@ -14,4 +16,47 @@ object ExceptionMessages { val SparkExceptionErrorPrefix = "Spark exception. Cause" val QueryRunErrorPrefix = "Fail to run query. Cause" val GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions." + + /** + * Extracts the root cause from a throwable and returns a sanitized error message. + * + * @param e + * The throwable from which to extract the root cause. + * @param maxLength + * The maximum length of the returned error message. Default is 100 characters. + * @return + * A sanitized and possibly truncated error message string. + */ + def extractRootCause(e: Throwable, maxLength: Int = 1000): String = { + truncateMessage(redactMessage(rootCause(e)), maxLength) + } + + private def rootCause(e: Throwable): Throwable = { + var cause = e + while (cause.getCause != null && cause.getCause != cause) { + cause = cause.getCause + } + cause + } + + private def redactMessage(cause: Throwable): String = cause match { + case e: AmazonS3Exception => + s"$S3ErrorPrefix: serviceName=[${e.getServiceName}], statusCode=[${e.getStatusCode}]" + case _ => + if (cause.getLocalizedMessage != null) { + cause.getLocalizedMessage + } else if (cause.getMessage != null) { + cause.getMessage + } else { + cause.toString + } + } + + private def truncateMessage(message: String, maxLength: Int): String = { + if (message.length > maxLength) { + message.take(maxLength) + "..." + } else { + message + } + } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 29f4f7ee0..99997d931 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent import dev.failsafe.function.CheckedRunnable import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} import org.opensearch.flint.common.metadata.log.FlintMetadataLogService +import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} import org.apache.spark.internal.Logging @@ -236,21 +237,6 @@ class FlintSparkIndexMonitor( override def run(): Unit = operation }) } - - private def extractRootCause(e: Throwable): String = { - var cause = e - while (cause.getCause != null && cause.getCause != cause) { - cause = cause.getCause - } - - if (cause.getLocalizedMessage != null) { - return cause.getLocalizedMessage - } - if (cause.getMessage != null) { - return cause.getMessage - } - cause.toString - } } object FlintSparkIndexMonitor extends Logging { From 99f75b889806d15832114e648171566a25a8fb13 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Tue, 16 Jul 2024 11:22:09 -0700 Subject: [PATCH 3/3] Change ErrorMessages to Java and add UT Signed-off-by: Chen Dai --- .../flint/core/logging/ExceptionMessages.java | 74 +++++++++++++++++++ .../core/logging/ExceptionMessages.scala | 62 ---------------- .../core/logging/ExceptionMessagesTest.java | 49 ++++++++++++ .../flint/spark/FlintSparkIndexMonitor.scala | 2 +- 4 files changed, 124 insertions(+), 63 deletions(-) create mode 100644 flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java delete mode 100644 flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala create mode 100644 flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java new file mode 100644 index 000000000..34f5dd257 --- /dev/null +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import com.amazonaws.services.s3.model.AmazonS3Exception; + +/** + * Define constants for common error messages and utility methods to handle them. + */ +public class ExceptionMessages { + public static final String SyntaxErrorPrefix = "Syntax error"; + public static final String S3ErrorPrefix = "Fail to read data from S3. Cause"; + public static final String GlueErrorPrefix = "Fail to read data from Glue. Cause"; + public static final String QueryAnalysisErrorPrefix = "Fail to analyze query. Cause"; + public static final String SparkExceptionErrorPrefix = "Spark exception. Cause"; + public static final String QueryRunErrorPrefix = "Fail to run query. Cause"; + public static final String GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions."; + + /** + * Extracts the root cause from a throwable and returns a sanitized error message. + * + * @param e The throwable from which to extract the root cause. + * @param maxLength The maximum length of the returned error message. Default is 1000 characters. + * @return A sanitized and possibly truncated error message string. + */ + public static String extractRootCause(Throwable e, int maxLength) { + return truncateMessage(redactMessage(rootCause(e)), maxLength); + } + + /** + * Overloads extractRootCause to provide a non-truncated error message. + * + * @param e The throwable from which to extract the root cause. + * @return A sanitized error message string. + */ + public static String extractRootCause(Throwable e) { + return redactMessage(rootCause(e)); + } + + private static Throwable rootCause(Throwable e) { + Throwable cause = e; + while (cause.getCause() != null && cause.getCause() != cause) { + cause = cause.getCause(); + } + return cause; + } + + private static String redactMessage(Throwable cause) { + if (cause instanceof AmazonS3Exception) { + AmazonS3Exception e = (AmazonS3Exception) cause; + return String.format("%s: serviceName=[%s], statusCode=[%d]", + S3ErrorPrefix, e.getServiceName(), e.getStatusCode()); + } else { + if (cause.getLocalizedMessage() != null) { + return cause.getLocalizedMessage(); + } else if (cause.getMessage() != null) { + return cause.getMessage(); + } else { + return cause.toString(); + } + } + } + + private static String truncateMessage(String message, int maxLength) { + if (message.length() > maxLength) { + return message.substring(0, maxLength) + "..."; + } else { + return message; + } + } +} diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala b/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala deleted file mode 100644 index 71bb729f5..000000000 --- a/flint-core/src/main/java/org/opensearch/flint/core/logging/ExceptionMessages.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.flint.core.logging - -import com.amazonaws.services.s3.model.AmazonS3Exception - -// Define constants for common error messages -object ExceptionMessages { - val SyntaxErrorPrefix = "Syntax error" - val S3ErrorPrefix = "Fail to read data from S3. Cause" - val GlueErrorPrefix = "Fail to read data from Glue. Cause" - val QueryAnalysisErrorPrefix = "Fail to analyze query. Cause" - val SparkExceptionErrorPrefix = "Spark exception. Cause" - val QueryRunErrorPrefix = "Fail to run query. Cause" - val GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions." - - /** - * Extracts the root cause from a throwable and returns a sanitized error message. - * - * @param e - * The throwable from which to extract the root cause. - * @param maxLength - * The maximum length of the returned error message. Default is 100 characters. - * @return - * A sanitized and possibly truncated error message string. - */ - def extractRootCause(e: Throwable, maxLength: Int = 1000): String = { - truncateMessage(redactMessage(rootCause(e)), maxLength) - } - - private def rootCause(e: Throwable): Throwable = { - var cause = e - while (cause.getCause != null && cause.getCause != cause) { - cause = cause.getCause - } - cause - } - - private def redactMessage(cause: Throwable): String = cause match { - case e: AmazonS3Exception => - s"$S3ErrorPrefix: serviceName=[${e.getServiceName}], statusCode=[${e.getStatusCode}]" - case _ => - if (cause.getLocalizedMessage != null) { - cause.getLocalizedMessage - } else if (cause.getMessage != null) { - cause.getMessage - } else { - cause.toString - } - } - - private def truncateMessage(message: String, maxLength: Int): String = { - if (message.length > maxLength) { - message.take(maxLength) + "..." - } else { - message - } - } -} diff --git a/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java b/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java new file mode 100644 index 000000000..b074be066 --- /dev/null +++ b/flint-core/src/test/java/org/opensearch/flint/core/logging/ExceptionMessagesTest.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.flint.core.logging; + +import static org.junit.Assert.assertEquals; +import static org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import org.apache.spark.SparkException; +import org.junit.Test; +import org.opensearch.OpenSearchException; + +public class ExceptionMessagesTest { + + @Test + public void testExtractRootCauseFromS3Exception() { + AmazonS3Exception exception = new AmazonS3Exception("test"); + exception.setServiceName("S3"); + exception.setStatusCode(400); + + assertEquals( + "Fail to read data from S3. Cause: serviceName=[S3], statusCode=[400]", + extractRootCause( + new SparkException("test", exception))); + } + + @Test + public void testExtractRootCauseFromOtherException() { + OpenSearchException exception = new OpenSearchException("Write is blocked"); + + assertEquals( + "Write is blocked", + extractRootCause( + new SparkException("test", exception))); + } + + @Test + public void testExtractRootCauseFromOtherExceptionWithLongMessage() { + OpenSearchException exception = new OpenSearchException("Write is blocked due to cluster is readonly"); + + assertEquals( + "Write is blocked due...", + extractRootCause( + new SparkException("test", exception), 20)); + } +} \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 99997d931..29acaea6b 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -210,7 +210,7 @@ class FlintSparkIndexMonitor( .finalLog(latest => exception match { case Some(ex) => - latest.copy(state = FAILED, error = extractRootCause(ex)) + latest.copy(state = FAILED, error = extractRootCause(ex, 1000)) case None => latest.copy(state = FAILED) })