Skip to content

Commit

Permalink
Disabled unit test coverage threshold in admin-tool and add integrati…
Browse files Browse the repository at this point in the history
…on test
  • Loading branch information
gaojieliu committed Jan 24, 2024
1 parent e0b7239 commit 7f61d66
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 24 deletions.
2 changes: 1 addition & 1 deletion clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ jar {
}

ext {
jacocoCoverageThreshold = 0.04
jacocoCoverageThreshold = 0.00
}
Original file line number Diff line number Diff line change
Expand Up @@ -1191,28 +1191,30 @@ private static void applyValueSchemaToStore(CommandLine cmd) throws Exception {
}

private static ZkClient readZKConfigAndBuildZKClient(String veniceZookeeperUrl, String zkSSLFile) throws Exception {
Properties systemProperties = System.getProperties();
try (BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(zkSSLFile), StandardCharsets.UTF_8))) {
String newLine = br.readLine();
while (newLine != null) {
String[] tokens = newLine.split("=");
if (tokens.length != 2) {
throw new VeniceException(
"ZK SSL config file format is incorrect: " + newLine
+ "\nZK SSL config file content example: zookeeper.client.secure=true");
if (!zkSSLFile.isEmpty()) {
Properties systemProperties = System.getProperties();
try (BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(zkSSLFile), StandardCharsets.UTF_8))) {
String newLine = br.readLine();
while (newLine != null) {
String[] tokens = newLine.split("=");
if (tokens.length != 2) {
throw new VeniceException(
"ZK SSL config file format is incorrect: " + newLine
+ "\nZK SSL config file content example: zookeeper.client.secure=true");
}
systemProperties.put(tokens[0], tokens[1]);
newLine = br.readLine();
}
systemProperties.put(tokens[0], tokens[1]);
newLine = br.readLine();
}
}
// Verified all required ZK SSL configs are present
for (String requiredZKSSLProperty: REQUIRED_ZK_SSL_SYSTEM_PROPERTIES) {
if (!systemProperties.containsKey(requiredZKSSLProperty)) {
throw new VeniceException("Missing required ZK SSL property: " + requiredZKSSLProperty);
// Verified all required ZK SSL configs are present
for (String requiredZKSSLProperty: REQUIRED_ZK_SSL_SYSTEM_PROPERTIES) {
if (!systemProperties.containsKey(requiredZKSSLProperty)) {
throw new VeniceException("Missing required ZK SSL property: " + requiredZKSSLProperty);
}
}
System.setProperties(systemProperties);
}
System.setProperties(systemProperties);
return ZkClientFactory.newZkClient(veniceZookeeperUrl);
}

Expand Down Expand Up @@ -1254,11 +1256,13 @@ private static void recoverStoreMetadata(CommandLine cmd) throws Exception {
// Construct ZK client
String veniceZookeeperUrl = getRequiredArgument(cmd, Arg.VENICE_ZOOKEEPER_URL, Command.RECOVER_STORE_METADATA);
// Check SSL configs in JVM system arguments for ZK
String zkSSLFile = getRequiredArgument(cmd, Arg.ZK_SSL_CONFIG_FILE, Command.RECOVER_STORE_METADATA);
String zkSSLFile = getOptionalArgument(cmd, Arg.ZK_SSL_CONFIG_FILE, "");
ZkClient zkClient = readZKConfigAndBuildZKClient(veniceZookeeperUrl, zkSSLFile);

String consumerConfigFile = getOptionalArgument(cmd, Arg.KAFKA_CONSUMER_CONFIG_FILE, "");
// Construct consumer to dump admin message
Properties consumerProperties = loadProperties(cmd, Arg.KAFKA_CONSUMER_CONFIG_FILE);
Properties consumerProperties =
consumerConfigFile.isEmpty() ? new Properties() : loadProperties(cmd, Arg.KAFKA_CONSUMER_CONFIG_FILE);
String pubSubBrokerUrl = getRequiredArgument(cmd, Arg.KAFKA_BOOTSTRAP_SERVERS, Command.RECOVER_STORE_METADATA);
consumerProperties = DumpAdminMessages.getPubSubConsumerProperties(pubSubBrokerUrl, consumerProperties);
PubSubConsumerAdapter consumer = getConsumer(consumerProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,8 @@ public enum Command {

RECOVER_STORE_METADATA(
"recover-store-metadata", "Recover store metadata in EI",
new Arg[] { URL, STORE, VENICE_ZOOKEEPER_URL, ZK_SSL_CONFIG_FILE, KAFKA_BOOTSTRAP_SERVERS,
KAFKA_CONSUMER_CONFIG_FILE, GRAVEYARD_CLUSTERS },
new Arg[] { RECOVER_CLUSTER, SKIP_LAST_STORE_CREATION, REPAIR }
new Arg[] { URL, STORE, VENICE_ZOOKEEPER_URL, KAFKA_BOOTSTRAP_SERVERS, GRAVEYARD_CLUSTERS },
new Arg[] { ZK_SSL_CONFIG_FILE, KAFKA_CONSUMER_CONFIG_FILE, RECOVER_CLUSTER, SKIP_LAST_STORE_CREATION, REPAIR }
),
BACKUP_STORE_METADATA_FROM_GRAVEYARD(
"backup-store-metadata-from-graveyard", "Backup store metadata from graveyard in EI",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.linkedin.venice.endToEnd;

import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE;
import static com.linkedin.venice.ConfigKeys.OFFLINE_JOB_START_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS;
import static com.linkedin.venice.ConfigKeys.TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.AdminTool;
import com.linkedin.venice.common.VeniceSystemStoreUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.ControllerClientFactory;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.UpdateStoreQueryParams;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.integration.utils.ServiceFactory;
import com.linkedin.venice.integration.utils.VeniceMultiClusterWrapper;
import com.linkedin.venice.integration.utils.VeniceTwoLayerMultiRegionMultiClusterWrapper;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class StoreMetadataRecoveryTest {
private static final int TEST_TIMEOUT = 120 * Time.MS_PER_SECOND;

private VeniceTwoLayerMultiRegionMultiClusterWrapper twoLayerClusterWrapper;
private VeniceMultiClusterWrapper multiClusterWrapper;
private String clusterName;
private String parentControllerUrl;
private String parentZKUrl;
private String parentKafkaUrl;
private String childControllerUrl;

@BeforeClass
public void setUp() {
Utils.thisIsLocalhost();
Properties parentControllerProperties = new Properties();
// Disable topic cleanup since parent and child are sharing the same kafka cluster.
parentControllerProperties
.setProperty(TOPIC_CLEANUP_SLEEP_INTERVAL_BETWEEN_TOPIC_LIST_FETCH_MS, String.valueOf(Long.MAX_VALUE));
parentControllerProperties.setProperty(OFFLINE_JOB_START_TIMEOUT_MS, "180000");
parentControllerProperties.setProperty(CONTROLLER_AUTO_MATERIALIZE_META_SYSTEM_STORE, "false");
parentControllerProperties.setProperty(CONTROLLER_AUTO_MATERIALIZE_DAVINCI_PUSH_STATUS_SYSTEM_STORE, "false");

Properties serverProperties = new Properties();
serverProperties.put(SERVER_PROMOTION_TO_LEADER_REPLICA_DELAY_SECONDS, 1L);

// 1 parent controller, 1 child region, 1 clusters per child region, 2 servers per cluster
twoLayerClusterWrapper = ServiceFactory.getVeniceTwoLayerMultiRegionMultiClusterWrapper(
1,
1,
1,
1,
2,
0,
2,
Optional.of(parentControllerProperties),
Optional.empty(),
Optional.of(serverProperties),
false);

multiClusterWrapper = twoLayerClusterWrapper.getChildRegions().get(0);
String[] clusterNames = multiClusterWrapper.getClusterNames();
Arrays.sort(clusterNames);
clusterName = clusterNames[0]; // venice-cluster0
parentControllerUrl = twoLayerClusterWrapper.getControllerConnectString();
parentZKUrl = twoLayerClusterWrapper.getZkServerWrapper().getAddress();
parentKafkaUrl = twoLayerClusterWrapper.getParentKafkaBrokerWrapper().getAddress();
childControllerUrl = multiClusterWrapper.getControllerConnectString();

for (String cluster: clusterNames) {
try (ControllerClient controllerClient = new ControllerClient(cluster, childControllerUrl)) {
// Verify the participant store is up and running in child region
String participantStoreName = VeniceSystemStoreUtils.getParticipantStoreNameForCluster(cluster);
TestUtils.waitForNonDeterministicPushCompletion(
Version.composeKafkaTopic(participantStoreName, 1),
controllerClient,
5,
TimeUnit.MINUTES);
}
}
}

@AfterClass(alwaysRun = true)
public void cleanUp() {
Utils.closeQuietlyWithErrorLogged(twoLayerClusterWrapper);
}

@Test(timeOut = TEST_TIMEOUT)
public void testStoreMetadataRecoveryAfterDeletion() throws Exception {
// Create two stores and delete one of them
ControllerClient parentControllerClient =
ControllerClientFactory.getControllerClient(clusterName, parentControllerUrl, Optional.empty());
ControllerClient childControllerClient =
ControllerClientFactory.getControllerClient(clusterName, childControllerUrl, Optional.empty());
String storeName1 = TestUtils.getUniqueTopicString("store_metadata_recovery");
String storeName2 = TestUtils.getUniqueTopicString("store_metadata_recovery");
String keySchemaStr = "\"string\"";
String simpleValueSchemaStr = "\"string\"";
String recordValueSchemaStr1 =
"{\n" + " \"name\": \"ValueRecord\",\n" + " \"type\": \"record\",\n" + " \"fields\": [\n"
+ " {\"name\": \"string_field\", \"type\": \"string\", \"default\": \"\"}\n" + " ]\n" + "}";
String recordValueSchemaStr2 = "{\n" + " \"name\": \"ValueRecord\",\n" + " \"type\": \"record\",\n"
+ " \"fields\": [\n" + " {\"name\": \"string_field\", \"type\": \"string\", \"default\": \"\"},\n"
+ " {\"name\": \"int_field\", \"type\": \"int\", \"default\": 10}\n" + " ]\n" + "}";

ControllerResponse controllerResponse =
parentControllerClient.createNewStore(storeName1, "test_owner", keySchemaStr, simpleValueSchemaStr);
if (controllerResponse.isError()) {
throw new VeniceException(
"Failed to create store: " + storeName1 + " with error: " + controllerResponse.getError());
}

controllerResponse =
parentControllerClient.createNewStore(storeName2, "test_owner", keySchemaStr, recordValueSchemaStr1);
if (controllerResponse.isError()) {
throw new VeniceException(
"Failed to create store: " + storeName2 + " with error: " + controllerResponse.getError());
}
controllerResponse = parentControllerClient.addValueSchema(storeName2, recordValueSchemaStr2);
if (controllerResponse.isError()) {
throw new VeniceException(
"Failed to add value schema to store: " + storeName2 + " with error: " + controllerResponse.getError());
}
controllerResponse = parentControllerClient.updateStore(
storeName2,
new UpdateStoreQueryParams().setHybridOffsetLagThreshold(100).setHybridRewindSeconds(100));
if (controllerResponse.isError()) {
throw new VeniceException(
"Failed to update store: " + storeName2 + " with error: " + controllerResponse.getError());
}
// Make sure both stores are available in child cluster
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreResponse storeResponse = childControllerClient.getStore(storeName1);
assertFalse(storeResponse.isError(), storeName1 + " should present in child region");
storeResponse = childControllerClient.getStore(storeName2);
assertFalse(storeResponse.isError(), storeName2 + " should present in child region");
});

// delete store2
controllerResponse = parentControllerClient.disableAndDeleteStore(storeName2);
if (controllerResponse.isError()) {
throw new VeniceException(
"Failed to delete store: " + storeName2 + " with error: " + controllerResponse.getError());
}
// Make sure both stores are available in child cluster
TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> {
StoreResponse storeResponse = childControllerClient.getStore(storeName2);

assertTrue(storeResponse.isError(), storeName2 + " shouldn't present in child region");
});

// Recover the deleted stores via admin tool
String[] storeRecovery = { "--recover-store-metadata", "--url", parentControllerUrl, "--venice-zookeeper-url",
parentZKUrl + "/", "--kafka-bootstrap-servers", parentKafkaUrl, "--store", storeName2, "--repair", "true",
"--graveyard-clusters", clusterName };
AdminTool.main(storeRecovery);

// Make sure the store is recovered with the right config and schema
StoreResponse storeResponse = parentControllerClient.getStore(storeName2);
if (storeResponse.isError()) {
throw new VeniceException("Failed to retrieve store info for store: " + storeName2);
}
// Check hybrid config
assertEquals(storeResponse.getStore().getHybridStoreConfig().getRewindTimeInSeconds(), 100);
assertEquals(storeResponse.getStore().getHybridStoreConfig().getOffsetLagThresholdToGoOnline(), 100);
// Verify schemas
SchemaResponse keySchemaResponse = parentControllerClient.getKeySchema(storeName2);
assertEquals(keySchemaResponse.getSchemaStr(), keySchemaStr);
MultiSchemaResponse valueSchemaResponse = parentControllerClient.getAllValueSchema(storeName2);
MultiSchemaResponse.Schema[] schemas = valueSchemaResponse.getSchemas();
assertEquals(schemas.length, 2);
assertEquals(Schema.parse(schemas[0].getSchemaStr()), Schema.parse(recordValueSchemaStr1));
assertEquals(Schema.parse(schemas[1].getSchemaStr()), Schema.parse(recordValueSchemaStr2));
}
}

0 comments on commit 7f61d66

Please sign in to comment.