Skip to content

Commit

Permalink
Apply new logging format to record exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger committed May 13, 2024
1 parent 45835cb commit 53be648
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class CustomLogging {
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();

static {
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
String[] parts = DOMAIN_NAME.split(":");
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;

logLevelActions.put("DEBUG", logger::debug);
logLevelActions.put("INFO", logger::info);
Expand Down Expand Up @@ -78,10 +78,6 @@ private static String convertToJson(Map<String, Object> logEventMap) {
* @return A map representation of the log event.
*/
protected static Map<String, Object> constructLogEventMap(String level, Object content, Throwable throwable) {
if (content == null) {
throw new IllegalArgumentException("Log message must not be null");
}

Map<String, Object> logEventMap = new LinkedHashMap<>();
Map<String, Object> body = new LinkedHashMap<>();
constructMessageBody(content, body);
Expand All @@ -105,6 +101,11 @@ protected static Map<String, Object> constructLogEventMap(String level, Object c
}

private static void constructMessageBody(Object content, Map<String, Object> body) {
if (content == null) {
body.put("message", "");
return;
}

if (content instanceof Message) {
Message message = (Message) content;
body.put("message", message.getFormattedMessage());
Expand Down Expand Up @@ -151,6 +152,10 @@ public static void logError(Object message) {
log("ERROR", message, null);
}

public static void logError(Throwable throwable) {
log("ERROR", "", throwable);
}

public static void logError(Object message, Throwable throwable) {
log("ERROR", message, throwable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ package org.apache.spark.sql

import java.util.concurrent.atomic.AtomicInteger

import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge
import play.api.libs.json._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.FlintREPL.parseArgs
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types._

Expand All @@ -28,28 +30,19 @@ import org.apache.spark.sql.types._
*/
object FlintJob extends Logging with FlintJobExecutor {
def main(args: Array[String]): Unit = {
val (queryOption, resultIndex) = args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}
val (queryOption, resultIndex) = parseArgs(args)

val conf = createSparkConf()
val jobType = conf.get("spark.flint.job.type", "batch")
logInfo(s"""Job type is: ${jobType}""")
CustomLogging.logInfo(s"""Job type is: ${jobType}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val dataSource = conf.get("spark.flint.datasource.name", "")
val query = queryOption.getOrElse(unescapeQuery(conf.get(FlintSparkConf.QUERY.key, "")))
if (query.isEmpty) {
throw new IllegalArgumentException(s"Query undefined for the ${jobType} job.")
val t = new IllegalArgumentException(s"Query undefined for the ${jobType} job.")
CustomLogging.logError(t)
throw t
}
// https://github.com/opensearch-project/opensearch-spark/issues/138
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.IRestHighLevelClient
import org.opensearch.flint.core.logging.{CustomLogging, OperationMessage}
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import play.api.libs.json._
Expand Down Expand Up @@ -422,7 +423,7 @@ trait FlintJobExecutor {
CleanerFactory.cleaner(streaming))
}

private def handleQueryException(
def handleQueryException(
e: Exception,
message: String,
errorSource: Option[String] = None,
Expand All @@ -433,7 +434,7 @@ trait FlintJobExecutor {
statusCode.map(code => "StatusCode" -> code.toString)

val errorJson = mapper.writeValueAsString(errorDetails)
logError(errorJson, e)
CustomLogging.logError(new OperationMessage("", statusCode.get), e)
errorJson
}

Expand Down Expand Up @@ -479,4 +480,21 @@ trait FlintJobExecutor {
handleQueryException(r, "Fail to run query. Cause")
}
}

def parseArgs(args: Array[String]): (Option[String], String) = {
args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
val t = new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
CustomLogging.logError(t)
throw t
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}
Expand Down Expand Up @@ -84,7 +85,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
conf.set("spark.sql.defaultCatalog", dataSource)

val jobType = conf.get(FlintSparkConf.JOB_TYPE.key, FlintSparkConf.JOB_TYPE.defaultValue.get)
logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
CustomLogging.logInfo(s"""Job type is: ${FlintSparkConf.JOB_TYPE.defaultValue.get}""")
conf.set(FlintSparkConf.JOB_TYPE.key, jobType)

val query = getQuery(queryOption, jobType, conf)
Expand Down Expand Up @@ -238,21 +239,6 @@ object FlintREPL extends Logging with FlintJobExecutor {
}
}

def parseArgs(args: Array[String]): (Option[String], String) = {
args.length match {
case 1 =>
(None, args(0)) // Starting from OS 2.13, resultIndex is the only argument
case 2 =>
(
Some(args(0)),
args(1)
) // Before OS 2.13, there are two arguments, the second one is resultIndex
case _ =>
throw new IllegalArgumentException(
"Unsupported number of arguments. Expected 1 or 2 arguments.")
}
}

def getQuery(queryOption: Option[String], jobType: String, conf: SparkConf): String = {
queryOption.getOrElse {
if (jobType.equalsIgnoreCase("streaming")) {
Expand Down

0 comments on commit 53be648

Please sign in to comment.