diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProvider.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProvider.java index 5ddca5f10e..1b86adebfe 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProvider.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProvider.java @@ -23,6 +23,7 @@ import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesResponse; import software.amazon.awssdk.services.servicediscovery.model.HttpInstanceSummary; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Objects; @@ -48,7 +49,7 @@ class AwsCloudMapPeerListProvider implements PeerListProvider, AutoCloseable { private final String serviceName; private final Map queryParameters; private final AwsCloudMapDynamicEndpointGroup endpointGroup; - private final int timeToRefreshSeconds; + private final Duration timeToRefresh; private final Backoff backoff; private final EventLoop eventLoop; private final String domainName; @@ -58,18 +59,18 @@ class AwsCloudMapPeerListProvider implements PeerListProvider, AutoCloseable { final String namespaceName, final String serviceName, final Map queryParameters, - final int timeToRefreshSeconds, + final Duration timeToRefresh, final Backoff backoff, final PluginMetrics pluginMetrics) { this.awsServiceDiscovery = Objects.requireNonNull(awsServiceDiscovery); this.namespaceName = Objects.requireNonNull(namespaceName); this.serviceName = Objects.requireNonNull(serviceName); this.queryParameters = Objects.requireNonNull(queryParameters); - this.timeToRefreshSeconds = timeToRefreshSeconds; + this.timeToRefresh = timeToRefresh; this.backoff = Objects.requireNonNull(backoff); - if (timeToRefreshSeconds < 1) - throw new IllegalArgumentException("timeToRefreshSeconds must be positive. Actual: " + timeToRefreshSeconds); + if (timeToRefresh.isNegative() || timeToRefresh.isZero()) + throw new IllegalArgumentException("timeToRefreshSeconds must be positive. Actual: " + timeToRefresh); eventLoop = CommonPools.workerGroup().next(); LOG.info("Using AWS CloudMap for Peer Forwarding. namespace='{}', serviceName='{}'", @@ -92,7 +93,7 @@ static AwsCloudMapPeerListProvider createPeerListProvider(final PeerForwarderCon final Map queryParameters = peerForwarderConfiguration.getAwsCloudMapQueryParameters(); final Backoff standardBackoff = Backoff.exponential(ONE_SECOND, TWENTY_SECONDS).withJitter(TWENTY_PERCENT); - final int timeToRefreshSeconds = 20; + final Duration timeToRefresh = Duration.ofSeconds(20); final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws"); @@ -108,7 +109,7 @@ static AwsCloudMapPeerListProvider createPeerListProvider(final PeerForwarderCon namespace, serviceName, queryParameters, - timeToRefreshSeconds, + timeToRefresh, standardBackoff, pluginMetrics); } @@ -175,7 +176,7 @@ private void discoverInstances() { LOG.warn("Failed to update endpoints.", ex); } finally { scheduledDiscovery = eventLoop.schedule(this::discoverInstances, - timeToRefreshSeconds, TimeUnit.SECONDS); + timeToRefresh.toMillis(), TimeUnit.MILLISECONDS); } } diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java index 0c7c23093a..67429e27ef 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/peerforwarder/discovery/AwsCloudMapPeerListProviderTest.java @@ -5,11 +5,9 @@ package org.opensearch.dataprepper.peerforwarder.discovery; -import org.opensearch.dataprepper.metrics.PluginMetrics; import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.retry.Backoff; import org.apache.commons.lang3.RandomStringUtils; -import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Nested; @@ -19,11 +17,13 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.InOrder; +import org.opensearch.dataprepper.metrics.PluginMetrics; import software.amazon.awssdk.services.servicediscovery.ServiceDiscoveryAsyncClient; import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesRequest; import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesResponse; import software.amazon.awssdk.services.servicediscovery.model.HttpInstanceSummary; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -38,6 +38,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import static org.awaitility.Awaitility.await; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.CoreMatchers.nullValue; @@ -59,7 +60,7 @@ class AwsCloudMapPeerListProviderTest { private String namespaceName; private String serviceName; private Map queryParameters; - private int timeToRefreshSeconds; + private Duration timeToRefresh; private Backoff backoff; private PluginMetrics pluginMetrics; private List objectsToClose; @@ -71,7 +72,7 @@ void setUp() { serviceName = RandomStringUtils.randomAlphabetic(10); queryParameters = generateRandomStringMap(); - timeToRefreshSeconds = 1; + timeToRefresh = Duration.ofMillis(200); backoff = mock(Backoff.class); pluginMetrics = mock(PluginMetrics.class); @@ -85,7 +86,7 @@ void tearDown() { private AwsCloudMapPeerListProvider createObjectUnderTest() { final AwsCloudMapPeerListProvider objectUnderTest = - new AwsCloudMapPeerListProvider(awsServiceDiscovery, namespaceName, serviceName, queryParameters, timeToRefreshSeconds, backoff, pluginMetrics); + new AwsCloudMapPeerListProvider(awsServiceDiscovery, namespaceName, serviceName, queryParameters, timeToRefresh, backoff, pluginMetrics); objectsToClose.add(objectUnderTest); return objectUnderTest; } @@ -133,7 +134,7 @@ void constructor_throws_with_null_Backoff() { @ParameterizedTest @ValueSource(ints = {Integer.MIN_VALUE, -10, -1, 0}) void constructor_throws_with_non_positive_timeToRefreshSeconds(final int badTimeToRefresh) { - timeToRefreshSeconds = badTimeToRefresh; + timeToRefresh = Duration.ofSeconds(badTimeToRefresh); assertThrows(IllegalArgumentException.class, this::createObjectUnderTest); @@ -374,7 +375,7 @@ private void waitUntilDiscoverInstancesCalledAtLeastOnce() { */ private void waitUntilDiscoverInstancesCalledAtLeast(final int timesCalled) { final long waitTimeMillis = (long) timesCalled * WAIT_TIME_MULTIPLIER_MILLIS; - Awaitility.waitAtMost(waitTimeMillis, TimeUnit.MILLISECONDS) + await().atMost(waitTimeMillis, TimeUnit.MILLISECONDS) .pollDelay(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> then(awsServiceDiscovery) .should(atLeast(timesCalled)) @@ -388,7 +389,7 @@ private void waitUntilDiscoverInstancesCalledAtLeast(final int timesCalled) { * @param objectUnderTest The object to wait for. */ private void waitUntilPeerListPopulated(final AwsCloudMapPeerListProvider objectUnderTest) { - Awaitility.waitAtMost(2, TimeUnit.SECONDS) + await().atMost(5, TimeUnit.SECONDS) .pollDelay(100, TimeUnit.MILLISECONDS) .untilAsserted(() -> { final List actualPeers = objectUnderTest.getPeerList(); @@ -411,7 +412,7 @@ private static Map generateRandomStringMap() { final Map map = new HashMap<>(); IntStream.range(0, random.nextInt(5) + 1) - .mapToObj(num -> map.put(UUID.randomUUID().toString(), UUID.randomUUID().toString())); + .forEach(num -> map.put(UUID.randomUUID().toString(), UUID.randomUUID().toString())); return map; }