Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store error message for streaming job execution in Flint metadata log #433

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import com.amazonaws.services.s3.model.AmazonS3Exception;

/**
* Define constants for common error messages and utility methods to handle them.
*/
public class ExceptionMessages {
public static final String SyntaxErrorPrefix = "Syntax error";
public static final String S3ErrorPrefix = "Fail to read data from S3. Cause";
public static final String GlueErrorPrefix = "Fail to read data from Glue. Cause";
public static final String QueryAnalysisErrorPrefix = "Fail to analyze query. Cause";
public static final String SparkExceptionErrorPrefix = "Spark exception. Cause";
public static final String QueryRunErrorPrefix = "Fail to run query. Cause";
public static final String GlueAccessDeniedMessage = "Access denied in AWS Glue service. Please check permissions.";

/**
* Extracts the root cause from a throwable and returns a sanitized error message.
*
* @param e The throwable from which to extract the root cause.
* @param maxLength The maximum length of the returned error message. Default is 1000 characters.
* @return A sanitized and possibly truncated error message string.
*/
public static String extractRootCause(Throwable e, int maxLength) {
return truncateMessage(redactMessage(rootCause(e)), maxLength);
}

/**
* Overloads extractRootCause to provide a non-truncated error message.
*
* @param e The throwable from which to extract the root cause.
* @return A sanitized error message string.
*/
public static String extractRootCause(Throwable e) {
return redactMessage(rootCause(e));
}

private static Throwable rootCause(Throwable e) {
Throwable cause = e;
while (cause.getCause() != null && cause.getCause() != cause) {
cause = cause.getCause();
}
return cause;
}
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

private static String redactMessage(Throwable cause) {
if (cause instanceof AmazonS3Exception) {
AmazonS3Exception e = (AmazonS3Exception) cause;
return String.format("%s: serviceName=[%s], statusCode=[%d]",
S3ErrorPrefix, e.getServiceName(), e.getStatusCode());
Comment on lines +54 to +55
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will have a look.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this is relevant directly and it seems binding to Log4j API. May reconsider when we refactor the entire extractRootCause and processQueryException after #435. Thanks!

} else {
if (cause.getLocalizedMessage() != null) {
return cause.getLocalizedMessage();
} else if (cause.getMessage() != null) {
return cause.getMessage();
} else {
return cause.toString();
}
}
}

private static String truncateMessage(String message, int maxLength) {
if (message.length() > maxLength) {
return message.substring(0, maxLength) + "...";
} else {
return message;
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.logging;

import static org.junit.Assert.assertEquals;
import static org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause;

import com.amazonaws.services.s3.model.AmazonS3Exception;
import org.apache.spark.SparkException;
import org.junit.Test;
import org.opensearch.OpenSearchException;

public class ExceptionMessagesTest {

@Test
public void testExtractRootCauseFromS3Exception() {
AmazonS3Exception exception = new AmazonS3Exception("test");
exception.setServiceName("S3");
exception.setStatusCode(400);

assertEquals(
"Fail to read data from S3. Cause: serviceName=[S3], statusCode=[400]",
extractRootCause(
new SparkException("test", exception)));
}

@Test
public void testExtractRootCauseFromOtherException() {
OpenSearchException exception = new OpenSearchException("Write is blocked");

assertEquals(
"Write is blocked",
extractRootCause(
new SparkException("test", exception)));
}

@Test
public void testExtractRootCauseFromOtherExceptionWithLongMessage() {
OpenSearchException exception = new OpenSearchException("Write is blocked due to cluster is readonly");

assertEquals(
"Write is blocked due...",
extractRootCause(
new SparkException("test", exception), 20));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedRunnable
import org.opensearch.flint.common.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.common.metadata.log.FlintMetadataLogService
import org.opensearch.flint.core.logging.ExceptionMessages.extractRootCause
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -118,7 +119,7 @@ class FlintSparkIndexMonitor(
} catch {
case e: Throwable =>
logError(s"Streaming job $name terminated with exception: ${e.getMessage}")
retryUpdateIndexStateToFailed(name)
retryUpdateIndexStateToFailed(name, exception = Some(e))
}
} else {
logInfo(s"Index monitor for [$indexName] not found.")
Expand All @@ -132,7 +133,7 @@ class FlintSparkIndexMonitor(
val name = FlintSparkIndexMonitor.indexMonitorTracker.keys.headOption
if (name.isDefined) {
logInfo(s"Found index name in index monitor task list: ${name.get}")
retryUpdateIndexStateToFailed(name.get)
retryUpdateIndexStateToFailed(name.get, exception = None)
} else {
logInfo(s"Index monitor task list is empty")
}
Expand Down Expand Up @@ -195,19 +196,24 @@ class FlintSparkIndexMonitor(
/**
* Transition the index state to FAILED upon encountering an exception. Retry in case conflicts
* with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
* 2) Record and persist the error message of the root cause for further diagnostics.
* ```
*
* TODO: Determine the appropriate state code based on the type of exception encountered
*/
private def retryUpdateIndexStateToFailed(indexName: String): Unit = {
private def retryUpdateIndexStateToFailed(
indexName: String,
exception: Option[Throwable]): Unit = {
logInfo(s"Updating index state to failed for $indexName")
retry {
flintMetadataLogService
.startTransaction(indexName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest.copy(state = FAILED))
.finalLog(latest =>
exception match {
case Some(ex) =>
latest.copy(state = FAILED, error = extractRootCause(ex, 1000))
case None =>
latest.copy(state = FAILED)
})
.commit(_ => {})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
latestLog should contain("state" -> "refreshing")
}

test("await monitor terminated with exception should update index state to failed") {
test("await monitor terminated with exception should update index state to failed with error") {
new Thread(() => {
Thread.sleep(3000L)

Expand All @@ -186,6 +186,8 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
// Assert index state is active now
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "failed")
latestLog("error").asInstanceOf[String] should (include("OpenSearchException") and
include("type=cluster_block_exception"))
}

test(
Expand All @@ -199,6 +201,7 @@ class FlintSparkIndexMonitorITSuite extends OpenSearchTransactionSuite with Matc
// Assert index state is active now
val latestLog = latestLogEntry(testLatestId)
latestLog should contain("state" -> "failed")
latestLog should not contain "error"
}

private def getLatestTimestamp: (Long, Long) = {
Expand Down
Loading