Skip to content

Commit

Permalink
[controller][test] Fix race condition during controller startup and f…
Browse files Browse the repository at this point in the history
…ix flakies (#714)

- Setup admin topic for a cluster before running cluster initialization routines  
- Close cluster wrapper at the end of test in `PartialUpdateClusterConfigTest`  
- Close local broker created during `TestActiveActiveIngestion::testAAIngestionWithStoreView`  
- Use `volatile` `PubSubProducerAdapter::forceClosed` flag to make update visible to a callback thread  
- Use unique data paths for venice-server-data in `AbstractClientEndToEndSetup` to avoid race condition
  • Loading branch information
sushantmane authored Oct 26, 2023
1 parent 461df8c commit 0ad5eba
Show file tree
Hide file tree
Showing 8 changed files with 658 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ApacheKafkaProducerAdapter implements PubSubProducerAdapter {

private KafkaProducer<KafkaKey, KafkaMessageEnvelope> producer;
private final ApacheKafkaProducerConfig producerConfig;
private boolean forceClosed = false;
private volatile boolean forceClosed = false;

/**
* @param producerConfig contains producer configs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -65,6 +66,11 @@ public void setUp() {
this.parentController = parentControllers.get(0);
}

@AfterClass(alwaysRun = true)
public void tearDown() {
multiRegionMultiClusterWrapper.close();
}

@Test(timeOut = TEST_TIMEOUT_MS, dataProvider = "True-and-False", dataProviderClass = DataProviderUtils.class)
public void testPartialUpdateAutoEnable(boolean activeActiveEnabled) {
final String storeName = Utils.getUniqueString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public void testIncrementalPushPartialUpdateClassicFormat() throws IOException {
runVPJ(vpjProperties, 1, childControllerClient);
}
VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
veniceClusterWrapper.waitVersion(storeName, 1);
try (AvroGenericStoreClient<Object, Object> storeReader = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) {
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> {
Expand Down Expand Up @@ -389,6 +390,8 @@ public void testIncrementalPushPartialUpdateNewFormat() throws IOException {
runVPJ(vpjProperties, 1, childControllerClient);
}
VeniceClusterWrapper veniceClusterWrapper = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
veniceClusterWrapper.waitVersion(storeName, 1);

try (AvroGenericStoreClient<Object, Object> storeReader = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName).setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) {
TestUtils.waitForNonDeterministicAssertion(10, TimeUnit.SECONDS, true, () -> {
Expand Down Expand Up @@ -452,6 +455,7 @@ public void testPartialUpdateOnBatchPushedKeys(CompressionStrategy compressionSt
try (ControllerClient childControllerClient = new ControllerClient(CLUSTER_NAME, childControllerUrl)) {
runVPJ(vpjProperties, 1, childControllerClient);
}
veniceClusterWrapper.waitVersion(storeName, 1);
// Produce partial updates on batch pushed keys
SystemProducer veniceProducer = getSamzaProducer(veniceClusterWrapper, storeName, Version.PushType.STREAM);
for (int i = 1; i < 100; i++) {
Expand Down Expand Up @@ -1179,7 +1183,7 @@ public void testWriteComputeWithHybridLeaderFollowerLargeRecord(
try (ControllerClient childControllerClient = new ControllerClient(CLUSTER_NAME, childControllerUrl)) {
runVPJ(vpjProperties, 1, childControllerClient);
}

veniceClusterWrapper.waitVersion(storeName, 1);
try (AvroGenericStoreClient<Object, Object> storeReader = ClientFactory.getAndStartGenericAvroClient(
ClientConfig.defaultGenericClientConfig(storeName)
.setVeniceURL(veniceClusterWrapper.getRandomRouterURL()))) {
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ public void setUp() throws Exception {

d2Client = D2TestUtils.getAndStartHttpsD2Client(veniceCluster.getZk().getAddress());

dataPath = Paths.get(System.getProperty("java.io.tmpdir"), "venice-server-data").toAbsolutePath().toString();
dataPath = Paths.get(System.getProperty("java.io.tmpdir"), Utils.getUniqueString("venice-server-data"))
.toAbsolutePath()
.toString();

prepareData();
prepareMetaSystemStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


public class VeniceSystemFactoryTest {
private static final int TEST_TIMEOUT = 15000; // ms
private static final int TEST_TIMEOUT = 90000; // ms

private VeniceClusterWrapper cluster;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,6 @@ void setVeniceWriterForCluster(String clusterName, VeniceWriter writer) {
*/
@Override
public synchronized void initStorageCluster(String clusterName) {
getVeniceHelixAdmin().initStorageCluster(clusterName);
asyncSetupEnabledMap.put(clusterName, true);
/*
* We might not be able to call a lot of functions of veniceHelixAdmin since
* current controller might not be the leader controller for the given clusterName
Expand Down Expand Up @@ -574,6 +572,9 @@ public synchronized void initStorageCluster(String clusterName) {
.setPartitionCount(AdminTopicUtils.PARTITION_NUM_FOR_ADMIN_TOPIC)
.build());
});

getVeniceHelixAdmin().initStorageCluster(clusterName);
asyncSetupEnabledMap.put(clusterName, true);
}

/**
Expand Down

0 comments on commit 0ad5eba

Please sign in to comment.