Skip to content

Commit

Permalink
Apply new logging format to record exceptions
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Apr 22, 2024
1 parent b5ab7bd commit ca2d479
Show file tree
Hide file tree
Showing 11 changed files with 39 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import java.io.IOException;

import static org.opensearch.flint.core.metrics.MetricConstants.*;
import static org.opensearch.flint.core.metrics.MetricsConstant.*;

/**
* A wrapper class for RestHighLevelClient to facilitate OpenSearch operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class CustomLogging {
private static final Map<String, BiConsumer<String, Throwable>> logLevelActions = new HashMap<>();

static {
String[] parts = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN).split(":");
DOMAIN_NAME = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
String[] parts = DOMAIN_NAME.split(":");
CLIENT_ID = parts.length == 2 ? parts[0] : UNKNOWN;
DOMAIN_NAME = parts.length == 2 ? parts[1] : UNKNOWN;

logLevelActions.put("DEBUG", logger::debug);
logLevelActions.put("INFO", logger::info);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public final class MetricConstants {
public final class MetricsConstant {

/**
* The prefix for all read-related metrics in OpenSearch.
Expand Down Expand Up @@ -107,7 +107,7 @@ public final class MetricConstants {
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

private MetricConstants() {
private MetricsConstant() {
// Private constructor to prevent instantiation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,17 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX;

/**
* {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
import java.util.logging.Level;
import java.util.logging.Logger;

import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricConstants.REQUEST_METADATA_WRITE_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX;
import static org.opensearch.flint.core.metrics.MetricsConstant.REQUEST_METADATA_WRITE_METRIC_PREFIX;

/**
* Provides functionality for updating and upserting documents in an OpenSearch index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.sys.addShutdownHook

import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState.{FAILED, REFRESHING}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil}
import org.opensearch.flint.core.metrics.{MetricsConstant, MetricsUtil}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -66,7 +66,7 @@ class FlintSparkIndexMonitor(
} catch {
case e: Throwable =>
logError("Failed to update index log entry", e)
MetricsUtil.incrementCounter(MetricConstants.STREAMING_HEARTBEAT_FAILED_METRIC)
MetricsUtil.incrementCounter(MetricsConstant.STREAMING_HEARTBEAT_FAILED_METRIC)
}
},
15, // Delay to ensure final logging is complete first, otherwise version conflicts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions}
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsConstant
import org.opensearch.flint.core.metrics.MetricsUtil.registerGauge
import play.api.libs.json._

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.types.{StructField, _}
import org.apache.spark.sql.types._

/**
* Spark SQL Application entrypoint
Expand Down Expand Up @@ -78,7 +78,7 @@ object FlintJob extends Logging with FlintJobExecutor {
resultIndex,
jobType.equalsIgnoreCase("streaming"),
streamingRunningCount)
registerGauge(MetricConstants.STREAMING_RUNNING_METRIC, streamingRunningCount)
registerGauge(MetricsConstant.STREAMING_RUNNING_METRIC, streamingRunningCount)
jobOperator.start()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import scala.concurrent.duration.{Duration, MINUTES}
import com.amazonaws.services.s3.model.AmazonS3Exception
import org.apache.commons.text.StringEscapeUtils.unescapeJava
import org.opensearch.flint.core.{FlintClient, IRestHighLevelClient}
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsConstant
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue}

Expand Down Expand Up @@ -108,11 +109,11 @@ trait FlintJobExecutor {
.mode("append")
.save(resultIndex)
IRestHighLevelClient.recordOperationSuccess(
MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX)
MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX)
} catch {
case e: Exception =>
IRestHighLevelClient.recordOperationFailure(
MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX,
MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX,
e)
}
}
Expand Down Expand Up @@ -402,7 +403,7 @@ trait FlintJobExecutor {
queryId: String,
sessionId: String): String = {
val error = s"$message: ${e.getMessage}"
logError(error, e)
CustomLogging.logError(error, e)
error
}

Expand All @@ -422,7 +423,7 @@ trait FlintJobExecutor {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
incrementCounter(MetricsConstant.S3_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsConstant
import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter, registerGauge, stopTimer}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}
import org.opensearch.search.sort.SortOrder
Expand Down Expand Up @@ -102,7 +102,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
resultIndex,
true,
streamingRunningCount)
registerGauge(MetricConstants.STREAMING_RUNNING_METRIC, streamingRunningCount)
registerGauge(MetricsConstant.STREAMING_RUNNING_METRIC, streamingRunningCount)
jobOperator.start()
} else {
// we don't allow default value for sessionIndex and sessionId. Throw exception if key not found.
Expand Down Expand Up @@ -136,7 +136,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
conf.getLong("spark.flint.job.queryWaitTimeoutMillis", DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS)

val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex.get)
val sessionTimerContext = getTimerContext(MetricConstants.REPL_PROCESSING_TIME_METRIC)
val sessionTimerContext = getTimerContext(MetricsConstant.REPL_PROCESSING_TIME_METRIC)

addShutdownHook(
flintSessionIndexUpdater,
Expand All @@ -149,8 +149,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
val threadPool =
threadPoolFactory.newDaemonThreadPoolScheduledExecutor("flint-repl-heartbeat", 1)

registerGauge(MetricConstants.REPL_RUNNING_METRIC, sessionRunningCount)
registerGauge(MetricConstants.STATEMENT_RUNNING_METRIC, statementRunningCount)
registerGauge(MetricsConstant.REPL_RUNNING_METRIC, sessionRunningCount)
registerGauge(MetricsConstant.STATEMENT_RUNNING_METRIC, statementRunningCount)
val jobStartTime = currentTimeProvider.currentEpochMillis()
// update heart beat every 30 seconds
// OpenSearch triggers recovery after 1 minute outdated heart beat
Expand Down Expand Up @@ -583,7 +583,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
canProceed = false
} else {
val statementTimerContext = getTimerContext(
MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
MetricsConstant.STATEMENT_PROCESSING_TIME_METRIC)
val flintCommand = processCommandInitiation(flintReader, flintSessionIndexUpdater)

val (dataToWrite, returnedVerificationResult) = processStatementOnVerification(
Expand Down Expand Up @@ -999,15 +999,15 @@ object FlintREPL extends Logging with FlintJobExecutor {
Thread.currentThread().interrupt()
logError("HeartBeatUpdater task was interrupted", ie)
incrementCounter(
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
MetricsConstant.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
) // Record heartbeat failure metric
// maybe due to invalid sequence number or primary term
case e: Exception =>
logWarning(
s"""Fail to update the last update time of the flint instance ${sessionId}""",
e)
incrementCounter(
MetricConstants.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
MetricsConstant.REQUEST_METADATA_HEARTBEAT_FAILED_METRIC
) // Record heartbeat failure metric
}
}
Expand Down Expand Up @@ -1124,15 +1124,15 @@ object FlintREPL extends Logging with FlintJobExecutor {
if (sessionRunningCount.get() > 0) {
sessionRunningCount.decrementAndGet()
}
incrementCounter(MetricConstants.REPL_SUCCESS_METRIC)
incrementCounter(MetricsConstant.REPL_SUCCESS_METRIC)
}

private def recordSessionFailed(sessionTimerContext: Timer.Context): Unit = {
stopTimer(sessionTimerContext)
if (sessionRunningCount.get() > 0) {
sessionRunningCount.decrementAndGet()
}
incrementCounter(MetricConstants.REPL_FAILED_METRIC)
incrementCounter(MetricsConstant.REPL_FAILED_METRIC)
}

private def recordStatementStateChange(
Expand All @@ -1143,9 +1143,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
statementRunningCount.decrementAndGet()
}
if (flintCommand.isComplete()) {
incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC)
incrementCounter(MetricsConstant.STATEMENT_SUCCESS_METRIC)
} else if (flintCommand.isFailed()) {
incrementCounter(MetricConstants.STATEMENT_FAILED_METRIC)
incrementCounter(MetricsConstant.STATEMENT_FAILED_METRIC)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.concurrent.duration.{Duration, MINUTES}
import scala.util.{Failure, Success, Try}

import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsConstant
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import org.opensearch.flint.core.storage.OpenSearchUpdater

Expand Down Expand Up @@ -145,8 +145,8 @@ case class JobOperator(
}

exceptionThrown match {
case true => incrementCounter(MetricConstants.STREAMING_FAILED_METRIC)
case false => incrementCounter(MetricConstants.STREAMING_SUCCESS_METRIC)
case true => incrementCounter(MetricsConstant.STREAMING_FAILED_METRIC)
case false => incrementCounter(MetricsConstant.STREAMING_SUCCESS_METRIC)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.{NamedXContentRegistry, XContentParser, XContentType}
import org.opensearch.common.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder, FlintOptions, IRestHighLevelClient}
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsConstant
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchQueryReader, OpenSearchScrollReader, OpenSearchUpdater}
import org.opensearch.index.query.{AbstractQueryBuilder, MatchAllQueryBuilder, QueryBuilder}
import org.opensearch.plugins.SearchPlugin
Expand Down Expand Up @@ -78,11 +78,11 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
client.createIndex(request, RequestOptions.DEFAULT)
logInfo(s"create $osIndexName successfully")
IRestHighLevelClient.recordOperationSuccess(
MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX)
MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX)
} catch {
case e: Exception =>
IRestHighLevelClient.recordOperationFailure(
MetricConstants.RESULT_METADATA_WRITE_METRIC_PREFIX,
MetricsConstant.RESULT_METADATA_WRITE_METRIC_PREFIX,
e)
throw new IllegalStateException(s"Failed to create index $osIndexName", e);
}
Expand Down Expand Up @@ -122,11 +122,11 @@ class OSClient(val flintOptions: FlintOptions) extends Logging {
result match {
case Success(response) =>
IRestHighLevelClient.recordOperationSuccess(
MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX)
MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX)
response
case Failure(e: Exception) =>
IRestHighLevelClient.recordOperationFailure(
MetricConstants.REQUEST_METADATA_READ_METRIC_PREFIX,
MetricsConstant.REQUEST_METADATA_READ_METRIC_PREFIX,
e)
throw new IllegalStateException(
String.format(
Expand Down

0 comments on commit ca2d479

Please sign in to comment.