From 422dae7c58cb859ec6e860ccb485c7bddccaa802 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 13 May 2024 16:55:26 -0700 Subject: [PATCH] Apply new logging format to record exceptions (#314) Signed-off-by: Louis Chu --- .../flint/core/IRestHighLevelClient.java | 23 +++++--- .../flint/core/logging/CustomLogging.java | 17 ++++-- .../scala/org/apache/spark/sql/FlintJob.scala | 18 ++---- .../apache/spark/sql/FlintJobExecutor.scala | 26 ++++++++- .../org/apache/spark/sql/FlintREPL.scala | 58 ++++++++----------- 5 files changed, 78 insertions(+), 64 deletions(-) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java index 12a5646f3..5c1080f8c 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/IRestHighLevelClient.java @@ -28,6 +28,8 @@ import org.opensearch.client.indices.PutMappingRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.client.RequestOptions; +import org.opensearch.flint.core.logging.CustomLogging; +import org.opensearch.flint.core.logging.OperationMessage; import org.opensearch.flint.core.metrics.MetricsUtil; import java.io.Closeable; @@ -90,7 +92,11 @@ static void recordOperationSuccess(String metricNamePrefix) { static void recordOperationFailure(String metricNamePrefix, Exception e) { OpenSearchException openSearchException = extractOpenSearchException(e); int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500; - + if (openSearchException != null) { + CustomLogging.logError(new OperationMessage("OpenSearch Operation failed.", statusCode), openSearchException); + } else { + CustomLogging.logError("OpenSearch Operation failed with an exception.", e); + } if (statusCode == 403) { String forbiddenErrorMetricName = metricNamePrefix + ".403.count"; MetricsUtil.incrementCounter(forbiddenErrorMetricName); @@ -104,15 +110,16 @@ static void recordOperationFailure(String metricNamePrefix, Exception e) { * Extracts an OpenSearchException from the given Throwable. * Checks if the Throwable is an instance of OpenSearchException or caused by one. * - * @param ex the exception to be checked + * @param e the exception to be checked * @return the extracted OpenSearchException, or null if not found */ - private static OpenSearchException extractOpenSearchException(Throwable ex) { - if (ex instanceof OpenSearchException) { - return (OpenSearchException) ex; - } else if (ex.getCause() instanceof OpenSearchException) { - return (OpenSearchException) ex.getCause(); + static OpenSearchException extractOpenSearchException(Throwable e) { + if (e instanceof OpenSearchException) { + return (OpenSearchException) e; + } else if (e.getCause() == null) { + return null; + } else { + return extractOpenSearchException(e.getCause()); } - return null; } } diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java index 8908e763b..d79147ae5 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java @@ -44,9 +44,9 @@ public class CustomLogging { private static final Map> logLevelActions = new HashMap<>(); static { - String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":"); + DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN); + String[] parts = DOMAIN_NAME.split(":"); CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN; - DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN; logLevelActions.put("DEBUG", logger::debug); logLevelActions.put("INFO", logger::info); @@ -78,10 +78,6 @@ private static String convertToJson(Map logEventMap) { * @return A map representation of the log event. */ protected static Map constructLogEventMap(String level, Object content, Throwable throwable) { - if (content == null) { - throw new IllegalArgumentException("Log message must not be null"); - } - Map logEventMap = new LinkedHashMap<>(); Map body = new LinkedHashMap<>(); constructMessageBody(content, body); @@ -105,6 +101,11 @@ protected static Map constructLogEventMap(String level, Object c } private static void constructMessageBody(Object content, Map body) { + if (content == null) { + body.put("message", ""); + return; + } + if (content instanceof Message) { Message message = (Message) content; body.put("message", message.getFormattedMessage()); @@ -151,6 +152,10 @@ public static void logError(Object message) { log("ERROR", message, null); } + public static void logError(Throwable throwable) { + log("ERROR", "", throwable); + } + public static void logError(Object message, Throwable throwable) { log("ERROR", message, throwable); } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index f582f9f45..bba999110 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -8,6 +8,7 @@ package org.apache.spark.sql import java.util.concurrent.atomic.AtomicInteger +import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge import play.api.libs.json._ @@ -28,28 +29,17 @@ import org.apache.spark.sql.types._ */ object FlintJob extends Logging with FlintJobExecutor { def main(args: Array[String]): Unit = { - val (queryOption, resultIndex) = args.length match { - case 1 => - (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument - case 2 => - ( - Some(args(0)), - args(1) - ) // Before OS 2.13, there are two arguments, the second one is resultIndex - case _ => - throw new IllegalArgumentException( - "Unsupported number of arguments. Expected 1 or 2 arguments.") - } + val (queryOption, resultIndex) = parseArgs(args) val conf = createSparkConf() val jobType = conf.get("spark.flint.job.type", "batch") - logInfo(s"""Job type is: ${jobType}""") + CustomLogging.logInfo(s"""Job type is: ${jobType}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) val dataSource = conf.get("spark.flint.datasource.name", "") val query = queryOption.getOrElse(unescapeQuery(conf.get(FlintSparkConf.QUERY.key, ""))) if (query.isEmpty) { - throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.") + logAndThrow(s"Query undefined for the ${jobType} job.") } // https://github.com/opensearch-project/opensearch-spark/issues/138 /* diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index fcb2ab56c..665ec5a27 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.IRestHighLevelClient +import org.opensearch.flint.core.logging.{CustomLogging, OperationMessage} import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import play.api.libs.json._ @@ -433,7 +434,11 @@ trait FlintJobExecutor { statusCode.map(code => "StatusCode" -> code.toString) val errorJson = mapper.writeValueAsString(errorDetails) - logError(errorJson, e) + + statusCode.foreach { code => + CustomLogging.logError(new OperationMessage("", code), e) + } + errorJson } @@ -479,4 +484,23 @@ trait FlintJobExecutor { handleQueryException(r, "Fail to run query. Cause") } } + + def parseArgs(args: Array[String]): (Option[String], String) = { + args match { + case Array(resultIndex) => + (None, resultIndex) // Starting from OS 2.13, resultIndex is the only argument + case Array(query, resultIndex) => + ( + Some(query), + resultIndex + ) // Before OS 2.13, there are two arguments, the second one is resultIndex + case _ => logAndThrow("Unsupported number of arguments. Expected 1 or 2 arguments.") + } + } + + def logAndThrow(errorMessage: String): Nothing = { + val t = new IllegalArgumentException(errorMessage) + CustomLogging.logError(t) + throw t + } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 1adb592c8..36432f016 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -21,6 +21,7 @@ import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats import org.opensearch.flint.core.FlintOptions +import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metrics.MetricConstants import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} @@ -67,7 +68,7 @@ object FlintREPL extends Logging with FlintJobExecutor { val (queryOption, resultIndex) = parseArgs(args) if (Strings.isNullOrEmpty(resultIndex)) { - throw new IllegalArgumentException("resultIndex is not set") + logAndThrow("resultIndex is not set") } // init SparkContext @@ -84,7 +85,7 @@ object FlintREPL extends Logging with FlintJobExecutor { conf.set("spark.sql.defaultCatalog", dataSource) val jobType = conf.get(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get) - logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""") + CustomLogging.logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""") conf.set(FlintSparkConf.JOB_TYPE.key, jobType) val query = getQuery(queryOption, jobType, conf) @@ -109,10 +110,10 @@ object FlintREPL extends Logging with FlintJobExecutor { val sessionId: Option[String] = Option(conf.get(FlintSparkConf.SESSION_ID.key, null)) if (sessionIndex.isEmpty) { - throw new IllegalArgumentException(FlintSparkConf.REQUEST_INDEX.key + " is not set") + logAndThrow(FlintSparkConf.REQUEST_INDEX.key + " is not set") } if (sessionId.isEmpty) { - throw new IllegalArgumentException(FlintSparkConf.SESSION_ID.key + " is not set") + logAndThrow(FlintSparkConf.SESSION_ID.key + " is not set") } val spark = createSparkSession(conf) @@ -238,27 +239,12 @@ object FlintREPL extends Logging with FlintJobExecutor { } } - def parseArgs(args: Array[String]): (Option[String], String) = { - args.length match { - case 1 => - (None, args(0)) // Starting from OS 2.13, resultIndex is the only argument - case 2 => - ( - Some(args(0)), - args(1) - ) // Before OS 2.13, there are two arguments, the second one is resultIndex - case _ => - throw new IllegalArgumentException( - "Unsupported number of arguments. Expected 1 or 2 arguments.") - } - } - def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = { queryOption.getOrElse { if (jobType.equalsIgnoreCase("streaming")) { val defaultQuery = conf.get(FlintSparkConf.QUERY.key, "") if (defaultQuery.isEmpty) { - throw new IllegalArgumentException("Query undefined for the streaming job.") + logAndThrow("Query undefined for the streaming job.") } unescapeQuery(defaultQuery) } else "" @@ -456,7 +442,7 @@ object FlintREPL extends Logging with FlintJobExecutor { sessionIndex: String, sessionTimerContext: Timer.Context): Unit = { val error = s"Session error: ${e.getMessage}" - logError(error, e) + CustomLogging.logError(error, e) val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId) .getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error)) @@ -476,7 +462,9 @@ object FlintREPL extends Logging with FlintJobExecutor { Option(getResponse.getSourceAsMap) .map(FlintInstance.deserializeFromMap) case Failure(exception) => - logError(s"Failed to retrieve existing FlintInstance: ${exception.getMessage}", exception) + CustomLogging.logError( + s"Failed to retrieve existing FlintInstance: ${exception.getMessage}", + exception) None case _ => None } @@ -645,7 +633,7 @@ object FlintREPL extends Logging with FlintJobExecutor { // or invalid catalog (e.g., we are operating on data not defined in provided data source) case e: Exception => val error = s"""Fail to write result of ${flintCommand}, cause: ${e.getMessage}""" - logError(error, e) + CustomLogging.logError(error, e) flintCommand.fail() updateSessionIndex(flintCommand, flintSessionIndexUpdater) recordStatementStateChange(flintCommand, statementTimerContext) @@ -672,7 +660,6 @@ object FlintREPL extends Logging with FlintJobExecutor { * actions that require the computation of results that need to be collected or stored. */ spark.sparkContext.cancelJobGroup(flintCommand.queryId) - logError(error) Some( handleCommandFailureAndGetFailedData( spark, @@ -705,13 +692,9 @@ object FlintREPL extends Logging with FlintJobExecutor { queryWaitTimeMillis)) } catch { case e: TimeoutException => - handleCommandTimeout( - spark, - dataSource, - s"Executing ${flintCommand.query} timed out", - flintCommand, - sessionId, - startTime) + val error = s"Executing ${flintCommand.query} timed out" + CustomLogging.logError(error, e) + handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime) case e: Exception => val error = processQueryException(e, flintCommand) Some( @@ -769,10 +752,12 @@ object FlintREPL extends Logging with FlintJobExecutor { } catch { case e: TimeoutException => val error = s"Getting the mapping of index $resultIndex timed out" + CustomLogging.logError(error, e) dataToWrite = handleCommandTimeout(spark, dataSource, error, flintCommand, sessionId, startTime) case NonFatal(e) => val error = s"An unexpected error occurred: ${e.getMessage}" + CustomLogging.logError(error, e) dataToWrite = Some( handleCommandFailureAndGetFailedData( spark, @@ -1003,13 +988,13 @@ object FlintREPL extends Logging with FlintJobExecutor { case ie: InterruptedException => // Preserve the interrupt status Thread.currentThread().interrupt() - logError("HeartBeatUpdater task was interrupted", ie) + CustomLogging.logError("HeartBeatUpdater task was interrupted", ie) incrementCounter( MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC ) // Record heartbeat failure metric // maybe due to invalid sequence number or primary term case e: Exception => - logWarning( + CustomLogging.logWarning( s"""Fail to update the last update time of the flint instance ${sessionId}""", e) incrementCounter( @@ -1069,7 +1054,7 @@ object FlintREPL extends Logging with FlintJobExecutor { } catch { // still proceed since we are not sure what happened (e.g., OpenSearch cluster may be unresponsive) case e: Exception => - logError(s"""Fail to find id ${sessionId} from session index.""", e) + CustomLogging.logError(s"""Fail to find id ${sessionId} from session index.""", e) true } } @@ -1114,10 +1099,13 @@ object FlintREPL extends Logging with FlintJobExecutor { if e.getCause != null && e.getCause.isInstanceOf[ConnectException] => retries += 1 val delay = initialDelay * math.pow(2, retries - 1).toLong - logError(s"Fail to connect to OpenSearch cluster. Retrying in $delay...", e) + CustomLogging.logError( + s"Fail to connect to OpenSearch cluster. Retrying in $delay...", + e) Thread.sleep(delay.toMillis) case e: Exception => + CustomLogging.logError(e) throw e } }