diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fdd48fe0ee2af..446b3bb0bb59b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -579,6 +579,7 @@ public void apply(Settings value, Settings current, Settings previous) { FsHealthService.ENABLED_SETTING, FsHealthService.REFRESH_INTERVAL_SETTING, FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING, + FsHealthService.HEALTHY_TIMEOUT_SETTING, TransportMainAction.OVERRIDE_MAIN_RESPONSE_VERSION, IndexingPressure.MAX_INDEXING_BYTES))); diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java index 5c5108a47e080..7bf1b3207aca6 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsHealthService.java @@ -57,6 +57,8 @@ import java.nio.file.StandardOpenOption; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -78,6 +80,9 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH private final NodeEnvironment nodeEnv; private final LongSupplier currentTimeMillisSupplier; private volatile Scheduler.Cancellable scheduledFuture; + private volatile TimeValue healthyTimeoutThreshold; + private final AtomicLong lastRunStartTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); @Nullable private volatile Set unhealthyPaths; @@ -85,11 +90,14 @@ public class FsHealthService extends AbstractLifecycleComponent implements NodeH public static final Setting ENABLED_SETTING = Setting.boolSetting("monitor.fs.health.enabled", true, Setting.Property.NodeScope, Setting.Property.Dynamic); public static final Setting REFRESH_INTERVAL_SETTING = - Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(120), TimeValue.timeValueMillis(1), + Setting.timeSetting("monitor.fs.health.refresh_interval", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), Setting.Property.NodeScope); public static final Setting SLOW_PATH_LOGGING_THRESHOLD_SETTING = Setting.timeSetting("monitor.fs.health.slow_path_logging_threshold", TimeValue.timeValueSeconds(5), TimeValue.timeValueMillis(1), Setting.Property.NodeScope, Setting.Property.Dynamic); + public static final Setting HEALTHY_TIMEOUT_SETTING = + Setting.timeSetting("monitor.fs.health.healthy_timeout_threshold", TimeValue.timeValueSeconds(60), TimeValue.timeValueMillis(1), + Setting.Property.NodeScope, Setting.Property.Dynamic); public FsHealthService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv) { @@ -98,8 +106,10 @@ public FsHealthService(Settings settings, ClusterSettings clusterSettings, Threa this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); this.slowPathLoggingThreshold = SLOW_PATH_LOGGING_THRESHOLD_SETTING.get(settings); this.currentTimeMillisSupplier = threadPool::relativeTimeInMillis; + this.healthyTimeoutThreshold = HEALTHY_TIMEOUT_SETTING.get(settings); this.nodeEnv = nodeEnv; clusterSettings.addSettingsUpdateConsumer(SLOW_PATH_LOGGING_THRESHOLD_SETTING, this::setSlowPathLoggingThreshold); + clusterSettings.addSettingsUpdateConsumer(HEALTHY_TIMEOUT_SETTING, this::setHealthyTimeoutThreshold); clusterSettings.addSettingsUpdateConsumer(ENABLED_SETTING, this::setEnabled); } @@ -126,6 +136,10 @@ public void setSlowPathLoggingThreshold(TimeValue slowPathLoggingThreshold) { this.slowPathLoggingThreshold = slowPathLoggingThreshold; } + public void setHealthyTimeoutThreshold(TimeValue healthyTimeoutThreshold) { + this.healthyTimeoutThreshold = healthyTimeoutThreshold; + } + @Override public StatusInfo getHealth() { StatusInfo statusInfo; @@ -134,6 +148,9 @@ public StatusInfo getHealth() { statusInfo = new StatusInfo(HEALTHY, "health check disabled"); } else if (brokenLock) { statusInfo = new StatusInfo(UNHEALTHY, "health check failed due to broken node lock"); + } else if (checkInProgress.get() && currentTimeMillisSupplier.getAsLong() - + lastRunStartTimeMillis.get() > healthyTimeoutThreshold.millis()) { + statusInfo = new StatusInfo(UNHEALTHY, "healthy threshold breached"); } else if (unhealthyPaths == null) { statusInfo = new StatusInfo(HEALTHY, "health check passed"); } else { @@ -149,19 +166,28 @@ class FsHealthMonitor implements Runnable { static final String TEMP_FILE_NAME = ".opensearch_temp_file"; private byte[] byteToWrite; - FsHealthMonitor(){ + FsHealthMonitor() { this.byteToWrite = UUIDs.randomBase64UUID().getBytes(StandardCharsets.UTF_8); } @Override public void run() { + boolean checkEnabled = enabled; try { - if (enabled) { + if (checkEnabled) { + setLastRunStartTimeMillis(); + boolean started = checkInProgress.compareAndSet(false, true); + assert started; monitorFSHealth(); logger.debug("health check succeeded"); } } catch (Exception e) { logger.error("health check failed", e); + } finally { + if (checkEnabled) { + boolean completed = checkInProgress.compareAndSet(true, false); + assert completed; + } } } @@ -192,6 +218,14 @@ private void monitorFSHealth() { logger.warn("health check of [{}] took [{}ms] which is above the warn threshold of [{}]", path, elapsedTime, slowPathLoggingThreshold); } + if (elapsedTime > healthyTimeoutThreshold.millis()) { + logger.error("health check of [{}] failed, took [{}ms] which is above the healthy threshold of [{}]", + path, elapsedTime, healthyTimeoutThreshold); + if (currentUnhealthyPaths == null) { + currentUnhealthyPaths = new HashSet<>(1); + } + currentUnhealthyPaths.add(path); + } } } catch (Exception ex) { logger.error(new ParameterizedMessage("health check of [{}] failed", path), ex); @@ -205,5 +239,9 @@ private void monitorFSHealth() { brokenLock = false; } } + + private void setLastRunStartTimeMillis() { + lastRunStartTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); + } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java index aefdde554b3c7..b1a960c0e6710 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/FsHealthServiceTests.java @@ -44,6 +44,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.StatusInfo; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.MockLogAppender; import org.opensearch.test.junit.annotations.TestLogging; @@ -63,6 +64,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.Matchers.equalTo; import static org.opensearch.monitor.StatusInfo.Status.HEALTHY; import static org.opensearch.monitor.StatusInfo.Status.UNHEALTHY; import static org.opensearch.node.Node.NODE_NAME_SETTING; @@ -170,7 +172,7 @@ public void testLoggingOnHungIO() throws Exception { } //disrupt file system - disruptFileSystemProvider.injectIOException.set(true); + disruptFileSystemProvider.injectIODelay.set(true); fsHealthService.new FsHealthMonitor().run(); assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); assertBusy(mockAppender::assertAllExpectationsMatched); @@ -182,6 +184,60 @@ public void testLoggingOnHungIO() throws Exception { } } + public void testFailsHealthOnHungIOBeyondHealthyTimeout() throws Exception { + long healthyTimeoutThreshold = randomLongBetween(500, 1000); + long refreshInterval = randomLongBetween(500, 1000); + long slowLogThreshold = randomLongBetween(100, 200); + long delayBetweenChecks = 100; + final Settings settings = Settings.builder() + .put(FsHealthService.HEALTHY_TIMEOUT_SETTING.getKey(), healthyTimeoutThreshold + "ms") + .put(FsHealthService.REFRESH_INTERVAL_SETTING.getKey(), refreshInterval + "ms") + .put(FsHealthService.SLOW_PATH_LOGGING_THRESHOLD_SETTING.getKey(), slowLogThreshold + "ms") + .put(ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.getKey(), 0)//we need to verify exact time + .build(); + FileSystem fileSystem = PathUtils.getDefaultFileSystem(); + TestThreadPool testThreadPool = new TestThreadPool(getClass().getName(), settings); + FileSystemFsyncHungProvider disruptFileSystemProvider = new FileSystemFsyncHungProvider(fileSystem, testThreadPool); + fileSystem = disruptFileSystemProvider.getFileSystem(null); + PathUtilsForTesting.installMock(fileSystem); + final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + try (NodeEnvironment env = newNodeEnvironment()) { + FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); + logger.info("--> Initial health status prior to the first monitor run"); + StatusInfo fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> First monitor run"); + fsHealthService.new FsHealthMonitor().run(); + fsHealth = fsHealthService.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + logger.info("--> Disrupt file system"); + disruptFileSystemProvider.injectIODelay.set(true); + final FsHealthService fsHealthSrvc = new FsHealthService(settings, clusterSettings, testThreadPool, env); + fsHealthSrvc.doStart(); + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == UNHEALTHY, healthyTimeoutThreshold + (2 *refreshInterval), + TimeUnit.MILLISECONDS)); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(UNHEALTHY, fsHealth.getStatus()); + assertEquals("healthy threshold breached", fsHealth.getInfo()); + int disruptedPathCount = disruptFileSystemProvider.getInjectedPathCount(); + assertThat(disruptedPathCount, equalTo(1)); + logger.info("--> Fix file system disruption"); + disruptFileSystemProvider.injectIODelay.set(false); + assertTrue(waitUntil(() -> fsHealthSrvc.getHealth().getStatus() == HEALTHY, delayBetweenChecks + (2 * refreshInterval), + TimeUnit.MILLISECONDS)); + fsHealth = fsHealthSrvc.getHealth(); + assertEquals(HEALTHY, fsHealth.getStatus()); + assertEquals("health check passed", fsHealth.getInfo()); + assertEquals(disruptedPathCount, disruptFileSystemProvider.getInjectedPathCount()); + fsHealthSrvc.doStop(); + } finally { + PathUtilsForTesting.teardown(); + ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); + } + } + public void testFailsHealthOnSinglePathFsyncFailure() throws IOException { FileSystem fileSystem = PathUtils.getDefaultFileSystem(); FileSystemFsyncIOExceptionProvider disruptFsyncFileSystemProvider = new FileSystemFsyncIOExceptionProvider(fileSystem); @@ -347,11 +403,12 @@ public void force(boolean metaData) throws IOException { private static class FileSystemFsyncHungProvider extends FilterFileSystemProvider { - AtomicBoolean injectIOException = new AtomicBoolean(); + AtomicBoolean injectIODelay = new AtomicBoolean(); AtomicInteger injectedPaths = new AtomicInteger(); private final long delay; private final ThreadPool threadPool; + private static final long AWAIT_BUSY_THRESHOLD = 100L; FileSystemFsyncHungProvider(FileSystem inner, long delay, ThreadPool threadPool) { super("disrupt_fs_health://", inner); @@ -359,6 +416,12 @@ private static class FileSystemFsyncHungProvider extends FilterFileSystemProvide this.threadPool = threadPool; } + FileSystemFsyncHungProvider(FileSystem inner, ThreadPool threadPool) { + super("disrupt_fs_health://", inner); + this.threadPool = threadPool; + this.delay = Long.MAX_VALUE; + } + public int getInjectedPathCount(){ return injectedPaths.get(); } @@ -368,17 +431,20 @@ public FileChannel newFileChannel(Path path, Set options, return new FilterFileChannel(super.newFileChannel(path, options, attrs)) { @Override public void force(boolean metaData) throws IOException { - if (injectIOException.get()) { + if (injectIODelay.get()) { if (path.getFileName().toString().equals(FsHealthService.FsHealthMonitor.TEMP_FILE_NAME)) { injectedPaths.incrementAndGet(); final long startTimeMillis = threadPool.relativeTimeInMillis(); + long timeInMillis = 1; + long maxWaitTimeMillis = startTimeMillis + delay >= 0 ? startTimeMillis + delay : Long.MAX_VALUE;//long overflow do { try { - Thread.sleep(delay); + Thread.sleep(timeInMillis); } catch (InterruptedException e) { throw new AssertionError(e); } - } while (threadPool.relativeTimeInMillis() <= startTimeMillis + delay); + timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); + } while (threadPool.relativeTimeInMillis() <= maxWaitTimeMillis && injectIODelay.get()); } } super.force(metaData);