Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 0.5] Support recovery for index with external scheduler #776

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ lazy val flintCommons = (project in file("flint-commons"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"org.scalatest" %% "scalatest-flatspec" % "3.2.15" % "test",
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"org.projectlombok" % "lombok" % "1.18.30",
"org.projectlombok" % "lombok" % "1.18.30" % "provided",
),
libraryDependencies ++= deps(sparkVersion),
publish / skip := true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.FlintIndexMetricSource;
import org.apache.spark.metrics.source.Source;
import scala.collection.Seq;

Expand All @@ -33,10 +34,20 @@ private MetricsUtil() {
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* This name is used to retrieve or create the counter.
*/
public static void incrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
incrementCounter(metricName, false);
}

/**
* Increments the Counter metric associated with the given metric name.
* If the counter does not exist, it is created before being incremented.
*
* @param metricName The name of the metric for which the counter is incremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void incrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null) {
counter.inc();
}
Expand All @@ -48,29 +59,48 @@ public static void incrementCounter(String metricName) {
* @param metricName The name of the metric counter to be decremented.
*/
public static void decrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
decrementCounter(metricName, false);
}

/**
* Decrements the value of the specified metric counter by one, if the counter exists and its current count is greater than zero.
*
* @param metricName The name of the metric counter to be decremented.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void decrementCounter(String metricName, boolean isIndexMetric) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null && counter.getCount() > 0) {
counter.dec();
}
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
* This context can be used to measure the duration of a particular operation or event.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
return getTimerContext(metricName, false);
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
*
* @param metricName The name of the metric timer to retrieve the context for.
* @param isIndexMetric Whether this metric is an index-specific metric.
* @return A {@link Timer.Context} instance for timing operations, or {@code null} if the timer could not be created or retrieved.
*/
public static Timer.Context getTimerContext(String metricName, boolean isIndexMetric) {
Timer timer = getOrCreateTimer(metricName, isIndexMetric);
return timer != null ? timer.time() : null;
}

/**
* Stops the timer associated with the given {@link Timer.Context}, effectively recording the elapsed time since the timer was started
* and returning the duration. If the context is {@code null}, this method does nothing and returns {@code null}.
* Stops the timer associated with the given {@link Timer.Context}.
*
* @param context The {@link Timer.Context} to stop. May be {@code null}, in which case this method has no effect and returns {@code null}.
* @param context The {@link Timer.Context} to stop. May be {@code null}.
* @return The elapsed time in nanoseconds since the timer was started, or {@code null} if the context was {@code null}.
*/
public static Long stopTimer(Timer.Context context) {
Expand All @@ -79,53 +109,61 @@ public static Long stopTimer(Timer.Context context) {

/**
* Registers a gauge metric with the provided name and value.
* The gauge will reflect the current value of the AtomicInteger provided.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
MetricRegistry metricRegistry = getMetricRegistry();
registerGauge(metricName, value, false);
}

/**
* Registers a gauge metric with the provided name and value.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
* @param isIndexMetric Whether this metric is an index-specific metric.
*/
public static void registerGauge(String metricName, final AtomicInteger value, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
if (metricRegistry == null) {
LOG.warning("MetricRegistry not available, cannot register gauge: " + metricName);
return;
}
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry();
private static Timer getOrCreateTimer(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.timer(metricName) : null;
}

// Retrieves the MetricRegistry from the current Spark environment.
private static MetricRegistry getMetricRegistry() {
private static MetricRegistry getMetricRegistry(boolean isIndexMetric) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot access MetricRegistry.");
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
return flintMetricSource.metricRegistry();
Source metricSource = isIndexMetric ?
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME(), FlintIndexMetricSource::new) :
getOrInitMetricSource(sparkEnv, FlintMetricSource.FLINT_METRIC_SOURCE_NAME(), FlintMetricSource::new);
return metricSource.metricRegistry();
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
private static Source getOrInitMetricSource(SparkEnv sparkEnv, String sourceName, java.util.function.Supplier<Source> sourceSupplier) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(sourceName);

if (metricSourceSeq == null || metricSourceSeq.isEmpty()) {
FlintMetricSource metricSource = new FlintMetricSource();
Source metricSource = sourceSupplier.get();
sparkEnv.metricsSystem().registerSource(metricSource);
return metricSource;
}
return (FlintMetricSource) metricSourceSeq.head();
return metricSourceSeq.head();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,25 @@ package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry

class FlintMetricSource() extends Source {
/**
* Metric source for general Flint metrics.
*/
class FlintMetricSource extends Source {

// Implementing the Source trait
override val sourceName: String = FlintMetricSource.FLINT_METRIC_SOURCE_NAME
override val metricRegistry: MetricRegistry = new MetricRegistry
}

/**
* Metric source for Flint index-specific metrics.
*/
class FlintIndexMetricSource extends Source {
override val sourceName: String = FlintMetricSource.FLINT_INDEX_METRIC_SOURCE_NAME
override val metricRegistry: MetricRegistry = new MetricRegistry
}

object FlintMetricSource {
val FLINT_METRIC_SOURCE_NAME = "Flint" // Default source name
val FLINT_INDEX_METRIC_SOURCE_NAME = "FlintIndex" // Index specific source name
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ public class FlintOptions implements Serializable {

public static final String DEFAULT_SUPPORT_SHARD = "true";

private static final String UNKNOWN = "UNKNOWN";

public static final String BULK_REQUEST_RATE_LIMIT_PER_NODE = "bulkRequestRateLimitPerNode";
public static final String DEFAULT_BULK_REQUEST_RATE_LIMIT_PER_NODE = "0";
public static final String DEFAULT_EXTERNAL_SCHEDULER_INTERVAL = "5 minutes";

public FlintOptions(Map<String, String> options) {
this.options = options;
Expand Down Expand Up @@ -185,9 +188,9 @@ public String getDataSourceName() {
* @return the AWS accountId
*/
public String getAWSAccountId() {
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", "");
String clusterName = System.getenv().getOrDefault("FLINT_CLUSTER_NAME", UNKNOWN + ":" + UNKNOWN);
String[] parts = clusterName.split(":");
return parts.length == 2 ? parts[0] : "";
return parts.length == 2 ? parts[0] : UNKNOWN;
}

public String getSystemIndexName() {
Expand Down
Loading
Loading