From ca2d479033f65161b7433ff797637aea4b46298d Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Mon, 22 Apr 2024 13:29:55 -0700 Subject: [PATCH] Apply new logging format to record exceptions Signed-off-by: Louis Chu --- .../core/RestHighLevelClientWrapper.java | 2 +- .../flint/core/logging/CustomLogging.java | 4 ++-- ...ricConstants.java => MetricsConstant.java} | 4 ++-- .../core/storage/OpenSearchQueryReader.java | 10 +------- .../flint/core/storage/OpenSearchUpdater.java | 4 ++-- .../flint/spark/FlintSparkIndexMonitor.scala | 4 ++-- .../scala/org/apache/spark/sql/FlintJob.scala | 6 ++--- .../apache/spark/sql/FlintJobExecutor.scala | 11 +++++---- .../org/apache/spark/sql/FlintREPL.scala | 24 +++++++++---------- .../org/apache/spark/sql/JobOperator.scala | 6 ++--- .../scala/org/apache/spark/sql/OSClient.scala | 10 ++++---- 11 files changed, 39 insertions(+), 46 deletions(-) rename flint-core/src/main/java/org/opensearch/flint/core/metrics/{MetricConstants.java => MetricsConstant.java} (98%) diff --git a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java index fa5696f50..df2cdc912 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/RestHighLevelClientWrapper.java @@ -31,7 +31,7 @@ import java.io.IOException; -import static org.opensearch.flint.core.metrics.MetricConstants.*; +import static org.opensearch.flint.core.metrics.MetricsConstant.*; /** * A wrapper class for RestHighLevelClient to facilitate OpenSearch operations diff --git a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java index 8908e763b..7a24500b7 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/logging/CustomLogging.java @@ -44,9 +44,9 @@ public class CustomLogging { private static final Map> logLevelActions = new HashMap<>(); static { - String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":"); + DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN); + String[] parts = DOMAIN_NAME.split(":"); CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN; - DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN; logLevelActions.put("DEBUG", logger::debug); logLevelActions.put("INFO", logger::info); diff --git a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsConstant.java similarity index 98% rename from flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java rename to flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsConstant.java index 6a081a740..281e549bf 100644 --- a/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricConstants.java +++ b/flint-core/src/main/java/org/opensearch/flint/core/metrics/MetricsConstant.java @@ -8,7 +8,7 @@ /** * This class defines custom metric constants used for monitoring flint operations. */ -public final class MetricConstants { +public final class MetricsConstant { /** * The prefix for all read-related metrics in OpenSearch. @@ -107,7 +107,7 @@ public final class MetricConstants { */ public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count"; - private MetricConstants() { + private MetricsConstant() { // Private constructor to prevent instantiation } } \ No newline at end of file diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java index 19ce6ce8b..7935c9dc7 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchQueryReader.java @@ -5,25 +5,17 @@ package org.opensearch.flint.core.storage; -import org.opensearch.OpenSearchStatusException; -import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.RequestOptions; -import org.opensearch.client.RestHighLevelClient; -import org.opensearch.common.Strings; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.flint.core.FlintOptions; import org.opensearch.flint.core.IRestHighLevelClient; import org.opensearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Optional; -import java.util.logging.Level; import java.util.logging.Logger; -import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX; /** * {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/ diff --git a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java index 0d84b4956..e23087351 100644 --- a/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java +++ b/flint-core/src/main/scala/org/opensearch/flint/core/storage/OpenSearchUpdater.java @@ -12,8 +12,8 @@ import java.util.logging.Level; import java.util.logging.Logger; -import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX; -import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_WRITE_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX; +import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_WRITE_METRIC_PREFIX; /** * Provides functionality for updating and upserting documents in an OpenSearch index. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 2f44a28f4..bee1e1ad8 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -12,7 +12,7 @@ import scala.sys.addShutdownHook import org.opensearch.flint.core.FlintClient import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING} -import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil} +import org.opensearch.flint.core.metrics.{MetricsConstant, MetricsUtil} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession @@ -66,7 +66,7 @@ class FlintSparkIndexMonitor( } catch { case e: Throwable => logError("Failed to update index log entry", e) - MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC) + MetricsUtil.incrementCounter(MetricsConstant.STREAMING_HEARTBEAT_FAILED_METRIC) } }, 15, // Delay to ensure final logging is complete first, otherwise version conflicts diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala index 0ac683f7b..b779e0468 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJob.scala @@ -15,7 +15,7 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions} import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsConstant import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge import play.api.libs.json._ @@ -23,7 +23,7 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.types.{StructField, _} +import org.apache.spark.sql.types._ /** * Spark SQL Application entrypoint @@ -78,7 +78,7 @@ object FlintJob extends Logging with FlintJobExecutor { resultIndex, jobType.equalsIgnoreCase("streaming"), streamingRunningCount) - registerGauge(MetricConstants.STREAMING_RUNNING_METRIC, streamingRunningCount) + registerGauge(MetricsConstant.STREAMING_RUNNING_METRIC, streamingRunningCount) jobOperator.start() } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala index d9b3810cb..a9e6517a8 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintJobExecutor.scala @@ -13,8 +13,9 @@ import scala.concurrent.duration.{Duration, MINUTES} import com.amazonaws.services.s3.model.AmazonS3Exception import org.apache.commons.text.StringEscapeUtils.unescapeJava import org.opensearch.flint.core.{FlintClient, IRestHighLevelClient} +import org.opensearch.flint.core.logging.CustomLogging import org.opensearch.flint.core.metadata.FlintMetadata -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsConstant import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue} @@ -108,11 +109,11 @@ trait FlintJobExecutor { .mode("append") .save(resultIndex) IRestHighLevelClient.recordOperationSuccess( - MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) + MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX) } catch { case e: Exception => IRestHighLevelClient.recordOperationFailure( - MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, + MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX, e) } } @@ -402,7 +403,7 @@ trait FlintJobExecutor { queryId: String, sessionId: String): String = { val error = s"$message: ${e.getMessage}" - logError(error, e) + CustomLogging.logError(error, e) error } @@ -422,7 +423,7 @@ trait FlintJobExecutor { case r: ParseException => handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId) case r: AmazonS3Exception => - incrementCounter(MetricConstants.S3_ERR_CNT_METRIC) + incrementCounter(MetricsConstant.S3_ERR_CNT_METRIC) handleQueryException( r, "Fail to read data from S3. Cause", diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala index 69b655e57..3132d9603 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/FlintREPL.scala @@ -21,7 +21,7 @@ import org.opensearch.common.Strings import org.opensearch.flint.app.{FlintCommand, FlintInstance} import org.opensearch.flint.app.FlintInstance.formats import org.opensearch.flint.core.FlintOptions -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsConstant import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer} import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater} import org.opensearch.search.sort.SortOrder @@ -102,7 +102,7 @@ object FlintREPL extends Logging with FlintJobExecutor { resultIndex, true, streamingRunningCount) - registerGauge(MetricConstants.STREAMING_RUNNING_METRIC, streamingRunningCount) + registerGauge(MetricsConstant.STREAMING_RUNNING_METRIC, streamingRunningCount) jobOperator.start() } else { // we don't allow default value for sessionIndex and sessionId. Throw exception if key not found. @@ -136,7 +136,7 @@ object FlintREPL extends Logging with FlintJobExecutor { conf.getLong("spark.flint.job.queryWaitTimeoutMillis", DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS) val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex.get) - val sessionTimerContext = getTimerContext(MetricConstants.REPL_PROCESSING_TIME_METRIC) + val sessionTimerContext = getTimerContext(MetricsConstant.REPL_PROCESSING_TIME_METRIC) addShutdownHook( flintSessionIndexUpdater, @@ -149,8 +149,8 @@ object FlintREPL extends Logging with FlintJobExecutor { val threadPool = threadPoolFactory.newDaemonThreadPoolScheduledExecutor("flint-repl-heartbeat", 1) - registerGauge(MetricConstants.REPL_RUNNING_METRIC, sessionRunningCount) - registerGauge(MetricConstants.STATEMENT_RUNNING_METRIC, statementRunningCount) + registerGauge(MetricsConstant.REPL_RUNNING_METRIC, sessionRunningCount) + registerGauge(MetricsConstant.STATEMENT_RUNNING_METRIC, statementRunningCount) val jobStartTime = currentTimeProvider.currentEpochMillis() // update heart beat every 30 seconds // OpenSearch triggers recovery after 1 minute outdated heart beat @@ -583,7 +583,7 @@ object FlintREPL extends Logging with FlintJobExecutor { canProceed = false } else { val statementTimerContext = getTimerContext( - MetricConstants.STATEMENT_PROCESSING_TIME_METRIC) + MetricsConstant.STATEMENT_PROCESSING_TIME_METRIC) val flintCommand = processCommandInitiation(flintReader, flintSessionIndexUpdater) val (dataToWrite, returnedVerificationResult) = processStatementOnVerification( @@ -999,7 +999,7 @@ object FlintREPL extends Logging with FlintJobExecutor { Thread.currentThread().interrupt() logError("HeartBeatUpdater task was interrupted", ie) incrementCounter( - MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC + MetricsConstant.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC ) // Record heartbeat failure metric // maybe due to invalid sequence number or primary term case e: Exception => @@ -1007,7 +1007,7 @@ object FlintREPL extends Logging with FlintJobExecutor { s"""Fail to update the last update time of the flint instance ${sessionId}""", e) incrementCounter( - MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC + MetricsConstant.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC ) // Record heartbeat failure metric } } @@ -1124,7 +1124,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (sessionRunningCount.get() > 0) { sessionRunningCount.decrementAndGet() } - incrementCounter(MetricConstants.REPL_SUCCESS_METRIC) + incrementCounter(MetricsConstant.REPL_SUCCESS_METRIC) } private def recordSessionFailed(sessionTimerContext: Timer.Context): Unit = { @@ -1132,7 +1132,7 @@ object FlintREPL extends Logging with FlintJobExecutor { if (sessionRunningCount.get() > 0) { sessionRunningCount.decrementAndGet() } - incrementCounter(MetricConstants.REPL_FAILED_METRIC) + incrementCounter(MetricsConstant.REPL_FAILED_METRIC) } private def recordStatementStateChange( @@ -1143,9 +1143,9 @@ object FlintREPL extends Logging with FlintJobExecutor { statementRunningCount.decrementAndGet() } if (flintCommand.isComplete()) { - incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC) + incrementCounter(MetricsConstant.STATEMENT_SUCCESS_METRIC) } else if (flintCommand.isFailed()) { - incrementCounter(MetricConstants.STATEMENT_FAILED_METRIC) + incrementCounter(MetricsConstant.STATEMENT_FAILED_METRIC) } } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala index 4fb272938..f3f487d99 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/JobOperator.scala @@ -12,7 +12,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException} import scala.concurrent.duration.{Duration, MINUTES} import scala.util.{Failure, Success, Try} -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsConstant import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter import org.opensearch.flint.core.storage.OpenSearchUpdater @@ -145,8 +145,8 @@ case class JobOperator( } exceptionThrown match { - case true => incrementCounter(MetricConstants.STREAMING_FAILED_METRIC) - case false => incrementCounter(MetricConstants.STREAMING_SUCCESS_METRIC) + case true => incrementCounter(MetricsConstant.STREAMING_FAILED_METRIC) + case false => incrementCounter(MetricsConstant.STREAMING_SUCCESS_METRIC) } } } diff --git a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala index f5e4ec2be..4ee05881d 100644 --- a/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala +++ b/spark-sql-application/src/main/scala/org/apache/spark/sql/OSClient.scala @@ -21,7 +21,7 @@ import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentParser, XContentType} import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient} -import org.opensearch.flint.core.metrics.MetricConstants +import org.opensearch.flint.core.metrics.MetricsConstant import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchScrollReader, OpenSearchUpdater} import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder} import org.opensearch.plugins.SearchPlugin @@ -78,11 +78,11 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { client.createIndex(request, RequestOptions.DEFAULT) logInfo(s"create $osIndexName successfully") IRestHighLevelClient.recordOperationSuccess( - MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX) + MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX) } catch { case e: Exception => IRestHighLevelClient.recordOperationFailure( - MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX, + MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX, e) throw new IllegalStateException(s"Failed to create index $osIndexName", e); } @@ -122,11 +122,11 @@ class OSClient(val flintOptions: FlintOptions) extends Logging { result match { case Success(response) => IRestHighLevelClient.recordOperationSuccess( - MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX) + MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX) response case Failure(e: Exception) => IRestHighLevelClient.recordOperationFailure( - MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX, + MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX, e) throw new IllegalStateException( String.format(