Skip to content

Commit

Permalink
Attempting to fix the flaky tests in AwsCloudMapPeerListProviderTest.…
Browse files Browse the repository at this point in the history
… This uses a higher wait for changes, and refreshes at sub-second intervals for testing. (opensearch-project#3628)

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Nov 10, 2023
1 parent 95dd099 commit 7df58f4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesResponse;
import software.amazon.awssdk.services.servicediscovery.model.HttpInstanceSummary;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -48,7 +49,7 @@ class AwsCloudMapPeerListProvider implements PeerListProvider, AutoCloseable {
private final String serviceName;
private final Map<String, String> queryParameters;
private final AwsCloudMapDynamicEndpointGroup endpointGroup;
private final int timeToRefreshSeconds;
private final Duration timeToRefresh;
private final Backoff backoff;
private final EventLoop eventLoop;
private final String domainName;
Expand All @@ -58,18 +59,18 @@ class AwsCloudMapPeerListProvider implements PeerListProvider, AutoCloseable {
final String namespaceName,
final String serviceName,
final Map<String, String> queryParameters,
final int timeToRefreshSeconds,
final Duration timeToRefresh,
final Backoff backoff,
final PluginMetrics pluginMetrics) {
this.awsServiceDiscovery = Objects.requireNonNull(awsServiceDiscovery);
this.namespaceName = Objects.requireNonNull(namespaceName);
this.serviceName = Objects.requireNonNull(serviceName);
this.queryParameters = Objects.requireNonNull(queryParameters);
this.timeToRefreshSeconds = timeToRefreshSeconds;
this.timeToRefresh = timeToRefresh;
this.backoff = Objects.requireNonNull(backoff);

if (timeToRefreshSeconds < 1)
throw new IllegalArgumentException("timeToRefreshSeconds must be positive. Actual: " + timeToRefreshSeconds);
if (timeToRefresh.isNegative() || timeToRefresh.isZero())
throw new IllegalArgumentException("timeToRefreshSeconds must be positive. Actual: " + timeToRefresh);

eventLoop = CommonPools.workerGroup().next();
LOG.info("Using AWS CloudMap for Peer Forwarding. namespace='{}', serviceName='{}'",
Expand All @@ -92,7 +93,7 @@ static AwsCloudMapPeerListProvider createPeerListProvider(final PeerForwarderCon
final Map<String, String> queryParameters = peerForwarderConfiguration.getAwsCloudMapQueryParameters();

final Backoff standardBackoff = Backoff.exponential(ONE_SECOND, TWENTY_SECONDS).withJitter(TWENTY_PERCENT);
final int timeToRefreshSeconds = 20;
final Duration timeToRefresh = Duration.ofSeconds(20);

final PluginMetrics awsSdkMetrics = PluginMetrics.fromNames("sdk", "aws");

Expand All @@ -108,7 +109,7 @@ static AwsCloudMapPeerListProvider createPeerListProvider(final PeerForwarderCon
namespace,
serviceName,
queryParameters,
timeToRefreshSeconds,
timeToRefresh,
standardBackoff,
pluginMetrics);
}
Expand Down Expand Up @@ -175,7 +176,7 @@ private void discoverInstances() {
LOG.warn("Failed to update endpoints.", ex);
} finally {
scheduledDiscovery = eventLoop.schedule(this::discoverInstances,
timeToRefreshSeconds, TimeUnit.SECONDS);
timeToRefresh.toMillis(), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@

package org.opensearch.dataprepper.peerforwarder.discovery;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.retry.Backoff;
import org.apache.commons.lang3.RandomStringUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
Expand All @@ -19,11 +17,13 @@
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import software.amazon.awssdk.services.servicediscovery.ServiceDiscoveryAsyncClient;
import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesRequest;
import software.amazon.awssdk.services.servicediscovery.model.DiscoverInstancesResponse;
import software.amazon.awssdk.services.servicediscovery.model.HttpInstanceSummary;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -38,6 +38,7 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
Expand All @@ -59,7 +60,7 @@ class AwsCloudMapPeerListProviderTest {
private String namespaceName;
private String serviceName;
private Map<String, String> queryParameters;
private int timeToRefreshSeconds;
private Duration timeToRefresh;
private Backoff backoff;
private PluginMetrics pluginMetrics;
private List<AwsCloudMapPeerListProvider> objectsToClose;
Expand All @@ -71,7 +72,7 @@ void setUp() {
serviceName = RandomStringUtils.randomAlphabetic(10);
queryParameters = generateRandomStringMap();

timeToRefreshSeconds = 1;
timeToRefresh = Duration.ofMillis(200);
backoff = mock(Backoff.class);
pluginMetrics = mock(PluginMetrics.class);

Expand All @@ -85,7 +86,7 @@ void tearDown() {

private AwsCloudMapPeerListProvider createObjectUnderTest() {
final AwsCloudMapPeerListProvider objectUnderTest =
new AwsCloudMapPeerListProvider(awsServiceDiscovery, namespaceName, serviceName, queryParameters, timeToRefreshSeconds, backoff, pluginMetrics);
new AwsCloudMapPeerListProvider(awsServiceDiscovery, namespaceName, serviceName, queryParameters, timeToRefresh, backoff, pluginMetrics);
objectsToClose.add(objectUnderTest);
return objectUnderTest;
}
Expand Down Expand Up @@ -133,7 +134,7 @@ void constructor_throws_with_null_Backoff() {
@ParameterizedTest
@ValueSource(ints = {Integer.MIN_VALUE, -10, -1, 0})
void constructor_throws_with_non_positive_timeToRefreshSeconds(final int badTimeToRefresh) {
timeToRefreshSeconds = badTimeToRefresh;
timeToRefresh = Duration.ofSeconds(badTimeToRefresh);

assertThrows(IllegalArgumentException.class,
this::createObjectUnderTest);
Expand Down Expand Up @@ -374,7 +375,7 @@ private void waitUntilDiscoverInstancesCalledAtLeastOnce() {
*/
private void waitUntilDiscoverInstancesCalledAtLeast(final int timesCalled) {
final long waitTimeMillis = (long) timesCalled * WAIT_TIME_MULTIPLIER_MILLIS;
Awaitility.waitAtMost(waitTimeMillis, TimeUnit.MILLISECONDS)
await().atMost(waitTimeMillis, TimeUnit.MILLISECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> then(awsServiceDiscovery)
.should(atLeast(timesCalled))
Expand All @@ -388,7 +389,7 @@ private void waitUntilDiscoverInstancesCalledAtLeast(final int timesCalled) {
* @param objectUnderTest The object to wait for.
*/
private void waitUntilPeerListPopulated(final AwsCloudMapPeerListProvider objectUnderTest) {
Awaitility.waitAtMost(2, TimeUnit.SECONDS)
await().atMost(5, TimeUnit.SECONDS)
.pollDelay(100, TimeUnit.MILLISECONDS)
.untilAsserted(() -> {
final List<String> actualPeers = objectUnderTest.getPeerList();
Expand All @@ -411,7 +412,7 @@ private static Map<String, String> generateRandomStringMap() {

final Map<String, String> map = new HashMap<>();
IntStream.range(0, random.nextInt(5) + 1)
.mapToObj(num -> map.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
.forEach(num -> map.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()));

return map;
}
Expand Down

0 comments on commit 7df58f4

Please sign in to comment.