Skip to content

Commit

Permalink
Apply new logging format to record exceptions (opensearch-project#314)
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored May 13, 2024
1 parent 45835cb commit 422dae7
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.opensearch.client.indices.PutMappingRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.logging.CustomLogging;
import org.opensearch.flint.core.logging.OperationMessage;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.Closeable;
Expand Down Expand Up @@ -90,7 +92,11 @@ static void recordOperationSuccess(String metricNamePrefix) {
static void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (openSearchException != null) {
CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException);
} else {
CustomLogging.logError("OpenSearch Operation failed with an exception.", e);
}
if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
Expand All @@ -104,15 +110,16 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) {
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @param e the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private static OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
static OpenSearchException extractOpenSearchException(Throwable e) {
if (e instanceof OpenSearchException) {
return (OpenSearchException) e;
} else if (e.getCause() == null) {
return null;
} else {
return extractOpenSearchException(e.getCause());
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class CustomLogging {
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();

static {
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
String[] parts = DOMAIN_NAME.split(":");
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;

logLevelActions.put("DEBUG", logger::debug);
logLevelActions.put("INFO", logger::info);
Expand Down Expand Up @@ -78,10 +78,6 @@ private static String convertToJson(Map<String, Object> logEventMap) {
* @return A map representation of the log event.
*/
protected static Map<String, Object> constructLogEventMap(String level, Object content, Throwable throwable) {
if (content == null) {
throw new IllegalArgumentException("Log message must not be null");
}

Map<String, Object> logEventMap = new LinkedHashMap<>();
Map<String, Object> body = new LinkedHashMap<>();
constructMessageBody(content, body);
Expand All @@ -105,6 +101,11 @@ protected static Map<String, Object> constructLogEventMap(String level, Object c
}

private static void constructMessageBody(Object content, Map<String, Object> body) {
if (content == null) {
body.put("message", "");
return;
}

if (content instanceof Message) {
Message message = (Message) content;
body.put("message", message.getFormattedMessage());
Expand Down Expand Up @@ -151,6 +152,10 @@ public static void logError(Object message) {
log("ERROR", message, null);
}

public static void logError(Throwable throwable) {
log("ERROR", "", throwable);
}

public static void logError(Object message, Throwable throwable) {
log("ERROR", message, throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.apache.spark.sql

import java.util.concurrent.atomic.AtomicInteger

import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge
import play.api.libs.json._
Expand All @@ -28,28 +29,17 @@ import org.apache.spark.sql.types._
*/
object FlintJob extends Logging with FlintJobExecutor {
def main(args: Array[String]): Unit = {
val (queryOption, resultIndex) = args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}
val (queryOption, resultIndex) = parseArgs(args)

val conf = createSparkConf()
val jobType = conf.get("spark.flint.job.type", "batch")
logInfo(s"""Job type is: ${jobType}""")
CustomLogging.logInfo(s"""Job type is: ${jobType}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val dataSource = conf.get("spark.flint.datasource.name", "")
val query = queryOption.getOrElse(unescapeQuery(conf.get(FlintSparkConf.QUERY.key, "")))
if (query.isEmpty) {
throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.")
logAndThrow(s"Query undefined for the ${jobType} job.")
}
// https://github.com/opensearch-project/opensearch-spark/issues/138
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.IRestHighLevelClient
import org.opensearch.flint.core.logging.{CustomLogging, OperationMessage}
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import play.api.libs.json._
Expand Down Expand Up @@ -433,7 +434,11 @@ trait FlintJobExecutor {
statusCode.map(code => "StatusCode" -> code.toString)

val errorJson = mapper.writeValueAsString(errorDetails)
logError(errorJson, e)

statusCode.foreach { code =>
CustomLogging.logError(new OperationMessage("", code), e)
}

errorJson
}

Expand Down Expand Up @@ -479,4 +484,23 @@ trait FlintJobExecutor {
handleQueryException(r, "Fail to run query. Cause")
}
}

def parseArgs(args: Array[String]): (Option[String], String) = {
args match {
case Array(resultIndex) =>
(None, resultIndex) // Starting from OS 2.13, resultIndex is the only argument
case Array(query, resultIndex) =>
(
Some(query),
resultIndex
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ => logAndThrow("Unsupported number of arguments. Expected 1 or 2 arguments.")
}
}

def logAndThrow(errorMessage: String): Nothing = {
val t = new IllegalArgumentException(errorMessage)
CustomLogging.logError(t)
throw t
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}
Expand Down Expand Up @@ -67,7 +68,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val (queryOption, resultIndex) = parseArgs(args)

if (Strings.isNullOrEmpty(resultIndex)) {
throw new IllegalArgumentException("resultIndex is not set")
logAndThrow("resultIndex is not set")
}

// init SparkContext
Expand All @@ -84,7 +85,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
conf.set("spark.sql.defaultCatalog", dataSource)

val jobType = conf.get(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get)
logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
CustomLogging.logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val query = getQuery(queryOption, jobType, conf)
Expand All @@ -109,10 +110,10 @@ object FlintREPL extends Logging with FlintJobExecutor {
val sessionId: Option[String] = Option(conf.get(FlintSparkConf.SESSION_ID.key, null))

if (sessionIndex.isEmpty) {
throw new IllegalArgumentException(FlintSparkConf.REQUEST_INDEX.key + " is not set")
logAndThrow(FlintSparkConf.REQUEST_INDEX.key + " is not set")
}
if (sessionId.isEmpty) {
throw new IllegalArgumentException(FlintSparkConf.SESSION_ID.key + " is not set")
logAndThrow(FlintSparkConf.SESSION_ID.key + " is not set")
}

val spark = createSparkSession(conf)
Expand Down Expand Up @@ -238,27 +239,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
}

def parseArgs(args: Array[String]): (Option[String], String) = {
args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}
}

def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = {
queryOption.getOrElse {
if (jobType.equalsIgnoreCase("streaming")) {
val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "")
if (defaultQuery.isEmpty) {
throw new IllegalArgumentException("Query undefined for the streaming job.")
logAndThrow("Query undefined for the streaming job.")
}
unescapeQuery(defaultQuery)
} else ""
Expand Down Expand Up @@ -456,7 +442,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
sessionIndex: String,
sessionTimerContext: Timer.Context): Unit = {
val error = s"Session error: ${e.getMessage}"
logError(error, e)
CustomLogging.logError(error, e)

val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId)
.getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))
Expand All @@ -476,7 +462,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
Option(getResponse.getSourceAsMap)
.map(FlintInstance.deserializeFromMap)
case Failure(exception) =>
logError(s"Failed to retrieve existing FlintInstance: ${exception.getMessage}", exception)
CustomLogging.logError(
s"Failed to retrieve existing FlintInstance: ${exception.getMessage}",
exception)
None
case _ => None
}
Expand Down Expand Up @@ -645,7 +633,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
// or invalid catalog (e.g., we are operating on data not defined in provided data source)
case e: Exception =>
val error = s"""Fail to write result of ${flintCommand}, cause: ${e.getMessage}"""
logError(error, e)
CustomLogging.logError(error, e)
flintCommand.fail()
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
recordStatementStateChange(flintCommand, statementTimerContext)
Expand All @@ -672,7 +660,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
* actions that require the computation of results that need to be collected or stored.
*/
spark.sparkContext.cancelJobGroup(flintCommand.queryId)
logError(error)
Some(
handleCommandFailureAndGetFailedData(
spark,
Expand Down Expand Up @@ -705,13 +692,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
queryWaitTimeMillis))
} catch {
case e: TimeoutException =>
handleCommandTimeout(
spark,
dataSource,
s"Executing ${flintCommand.query} timed out",
flintCommand,
sessionId,
startTime)
val error = s"Executing ${flintCommand.query} timed out"
CustomLogging.logError(error, e)
handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime)
case e: Exception =>
val error = processQueryException(e, flintCommand)
Some(
Expand Down Expand Up @@ -769,10 +752,12 @@ object FlintREPL extends Logging with FlintJobExecutor {
} catch {
case e: TimeoutException =>
val error = s"Getting the mapping of index $resultIndex timed out"
CustomLogging.logError(error, e)
dataToWrite =
handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime)
case NonFatal(e) =>
val error = s"An unexpected error occurred: ${e.getMessage}"
CustomLogging.logError(error, e)
dataToWrite = Some(
handleCommandFailureAndGetFailedData(
spark,
Expand Down Expand Up @@ -1003,13 +988,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
case ie: InterruptedException =>
// Preserve the interrupt status
Thread.currentThread().interrupt()
logError("HeartBeatUpdater task was interrupted", ie)
CustomLogging.logError("HeartBeatUpdater task was interrupted", ie)
incrementCounter(
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
) // Record heartbeat failure metric
// maybe due to invalid sequence number or primary term
case e: Exception =>
logWarning(
CustomLogging.logWarning(
s"""Fail to update the last update time of the flint instance ${sessionId}""",
e)
incrementCounter(
Expand Down Expand Up @@ -1069,7 +1054,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
} catch {
// still proceed since we are not sure what happened (e.g., OpenSearch cluster may be unresponsive)
case e: Exception =>
logError(s"""Fail to find id ${sessionId} from session index.""", e)
CustomLogging.logError(s"""Fail to find id ${sessionId} from session index.""", e)
true
}
}
Expand Down Expand Up @@ -1114,10 +1099,13 @@ object FlintREPL extends Logging with FlintJobExecutor {
if e.getCause != null && e.getCause.isInstanceOf[ConnectException] =>
retries += 1
val delay = initialDelay * math.pow(2, retries - 1).toLong
logError(s"Fail to connect to OpenSearch cluster. Retrying in $delay...", e)
CustomLogging.logError(
s"Fail to connect to OpenSearch cluster. Retrying in $delay...",
e)
Thread.sleep(delay.toMillis)

case e: Exception =>
CustomLogging.logError(e)
throw e
}
}
Expand Down

0 comments on commit 422dae7

Please sign in to comment.