Skip to content

Commit

Permalink
[admin-tool] Added command to recover stores from graveyard and admin…
Browse files Browse the repository at this point in the history
… channel (#822)

* [admin-tool] Added command to recover stores from graveyard and admin channel

When we accidentally remove some being-used stores from the Venice platform, we
can use the newly added command to recover the metadata and schemas, but not data.
The idea is as follows:
1. If the store already exists, the command will exit.
2. If the store is not found in store graveyard, not recoverable.
3. Then retrieve the store config from graveyard.
4. Scan the corresponding admin channel to collect the latest key/value schemas.
5. Create the store, register the schemas and then update the store configs.

Sample command:
java -jar venice-admin-tool-all.jar --recover-store-metadata --url ${parent.controller.url} --venice-zookeeper-url {parent zookeeper url} --zk-ssl-config-file ./zk.ssl --kafka-bootstrap-servers {Parent Kafka url} --kafka-consumer-config-file ./ssl.config --store {store_name} --repair true --graveyard-clusters {cluster1,cluster2}

Sample zk.ssl:
zookeeper.client.secure=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keyStore.location=./identity.p12
zookeeper.ssl.keyStore.password=xxxx
zookeeper.ssl.keyStore.type=PKCS12
zookeeper.ssl.trustStore.location=./cacerts
zookeeper.ssl.trustStore.password=xxxxx
zookeeper.ssl.trustStore.type=JKS
jute.maxbuffer=4194304

Sample ssl.config:
security.protocol=SSL
ssl.protocol=TLS
ssl.trustmanager.algorithm=SunX509
ssl.keymanager.algorithm=SunX509
ssl.keystore.type=pkcs12
ssl.keystore.location=./identity.p12
ssl.keystore.password=xxxx
ssl.key.password=xxxx
ssl.truststore.type=JKS
ssl.truststore.location=./cacerts
ssl.truststore.password=xxxx
ssl.secure.random.implementation=SHA1PRNG

We can use the existing command in AdminTool to run an empty push against the recovered store if necessary.

* Fixed spotbugs issue

* more tweaks

* Disabled unit test coverage threshold in admin-tool and add integration test

* excluded admin-tool diff coverage
  • Loading branch information
gaojieliu authored Jan 24, 2024
1 parent 5fdee1f commit 7e43d93
Show file tree
Hide file tree
Showing 8 changed files with 671 additions and 44 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,9 @@ ext.createDiffFile = { ->
// venice-standalone
':!services/venice-standalone/*', // exclude the entire standalone project

// admin-tool
':!clients/venice-admin-tool/*',

// Keep this last
// Other files that have tests but are not executed in the regular unit test task
':!internal/alpini/*'
Expand Down
2 changes: 1 addition & 1 deletion clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ jar {
}

ext {
jacocoCoverageThreshold = 0.04
jacocoCoverageThreshold = 0.00
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.zookeeper.impl.client.ZkClient;


public class AdminTool {
Expand Down Expand Up @@ -545,6 +546,12 @@ public static void main(String args[]) throws Exception {
case CONFIGURE_STORE_VIEW:
configureStoreView(cmd);
break;
case BACKUP_STORE_METADATA_FROM_GRAVEYARD:
backupStoreMetadataFromGraveyard(cmd);
break;
case RECOVER_STORE_METADATA:
recoverStoreMetadata(cmd);
break;
default:
StringJoiner availableCommands = new StringJoiner(", ");
for (Command c: Command.values()) {
Expand Down Expand Up @@ -1183,6 +1190,100 @@ private static void applyValueSchemaToStore(CommandLine cmd) throws Exception {
printObject(valueResponse);
}

private static ZkClient readZKConfigAndBuildZKClient(String veniceZookeeperUrl, String zkSSLFile) throws Exception {
if (!zkSSLFile.isEmpty()) {
Properties systemProperties = System.getProperties();
try (BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(zkSSLFile), StandardCharsets.UTF_8))) {
String newLine = br.readLine();
while (newLine != null) {
String[] tokens = newLine.split("=");
if (tokens.length != 2) {
throw new VeniceException(
"ZK SSL config file format is incorrect: " + newLine
+ "\nZK SSL config file content example: zookeeper.client.secure=true");
}
systemProperties.put(tokens[0], tokens[1]);
newLine = br.readLine();
}
}
// Verified all required ZK SSL configs are present
for (String requiredZKSSLProperty: REQUIRED_ZK_SSL_SYSTEM_PROPERTIES) {
if (!systemProperties.containsKey(requiredZKSSLProperty)) {
throw new VeniceException("Missing required ZK SSL property: " + requiredZKSSLProperty);
}
}
System.setProperties(systemProperties);
}
return ZkClientFactory.newZkClient(veniceZookeeperUrl);
}

private static void backupStoreMetadataFromGraveyard(CommandLine cmd) throws Exception {
String backupFolderPath = getRequiredArgument(cmd, Arg.BACKUP_FOLDER, Command.BACKUP_STORE_METADATA_FROM_GRAVEYARD);
// Construct ZK client
String veniceZookeeperUrl =
getRequiredArgument(cmd, Arg.VENICE_ZOOKEEPER_URL, Command.BACKUP_STORE_METADATA_FROM_GRAVEYARD);
String graveyardClusters = getRequiredArgument(cmd, Arg.GRAVEYARD_CLUSTERS, "").trim();
if (graveyardClusters.isEmpty()) {
throw new VeniceException("Graveyard clusters argument shouldn't be empty");
}
List<String> graveyardClusterList =
Arrays.stream(graveyardClusters.split(",")).map(s -> s.trim()).collect(Collectors.toList());
String zkSSLFile = getRequiredArgument(cmd, Arg.ZK_SSL_CONFIG_FILE, Command.BACKUP_STORE_METADATA_FROM_GRAVEYARD);

ZkClient zkClient = readZKConfigAndBuildZKClient(veniceZookeeperUrl, zkSSLFile);
try {
RecoverStoreMetadata.backupStoreGraveyard(zkClient, graveyardClusterList, backupFolderPath);
} finally {
zkClient.close();
}
}

private static void recoverStoreMetadata(CommandLine cmd) throws Exception {
String store = getRequiredArgument(cmd, Arg.STORE, Command.RECOVER_STORE_METADATA);
String url = getRequiredArgument(cmd, Arg.URL, Command.RECOVER_STORE_METADATA);
boolean skipLastStoreCreation =
Boolean.parseBoolean(getOptionalArgument(cmd, Arg.SKIP_LAST_STORE_CREATION, "false"));
boolean doRepair = Boolean.parseBoolean(getOptionalArgument(cmd, Arg.REPAIR, "false"));
String graveyardClusters = getRequiredArgument(cmd, Arg.GRAVEYARD_CLUSTERS, Command.RECOVER_STORE_METADATA).trim();
if (graveyardClusters.isEmpty()) {
throw new VeniceException("Graveyard clusters argument shouldn't be empty");
}
List<String> graveyardClusterList =
Arrays.stream(graveyardClusters.split(",")).map(s -> s.trim()).collect(Collectors.toList());
String recoverCluster = getOptionalArgument(cmd, Arg.RECOVER_CLUSTER, "");

// Construct ZK client
String veniceZookeeperUrl = getRequiredArgument(cmd, Arg.VENICE_ZOOKEEPER_URL, Command.RECOVER_STORE_METADATA);
// Check SSL configs in JVM system arguments for ZK
String zkSSLFile = getOptionalArgument(cmd, Arg.ZK_SSL_CONFIG_FILE, "");
ZkClient zkClient = readZKConfigAndBuildZKClient(veniceZookeeperUrl, zkSSLFile);

String consumerConfigFile = getOptionalArgument(cmd, Arg.KAFKA_CONSUMER_CONFIG_FILE, "");
// Construct consumer to dump admin message
Properties consumerProperties =
consumerConfigFile.isEmpty() ? new Properties() : loadProperties(cmd, Arg.KAFKA_CONSUMER_CONFIG_FILE);
String pubSubBrokerUrl = getRequiredArgument(cmd, Arg.KAFKA_BOOTSTRAP_SERVERS, Command.RECOVER_STORE_METADATA);
consumerProperties = DumpAdminMessages.getPubSubConsumerProperties(pubSubBrokerUrl, consumerProperties);
PubSubConsumerAdapter consumer = getConsumer(consumerProperties);

try {
RecoverStoreMetadata.recover(
zkClient,
consumer,
sslFactory,
url,
store,
skipLastStoreCreation,
doRepair,
graveyardClusterList,
recoverCluster);
} finally {
consumer.close();
zkClient.close();
}
}

private static void applyValueSchemaToZK(CommandLine cmd) throws Exception {
String store = getRequiredArgument(cmd, Arg.STORE, Command.ADD_SCHEMA_TO_ZK);
String cluster = getRequiredArgument(cmd, Arg.CLUSTER, Command.ADD_SCHEMA_TO_ZK);
Expand All @@ -1193,57 +1294,38 @@ private static void applyValueSchemaToZK(CommandLine cmd) throws Exception {
Arg.VALUE_SCHEMA_ID.toString());
// Check SSL configs in JVM system arguments for ZK
String zkSSLFile = getRequiredArgument(cmd, Arg.ZK_SSL_CONFIG_FILE, Command.ADD_SCHEMA_TO_ZK);
Properties systemProperties = System.getProperties();
try (BufferedReader br =
new BufferedReader(new InputStreamReader(new FileInputStream(zkSSLFile), StandardCharsets.UTF_8))) {
String newLine = br.readLine();
while (newLine != null) {
String[] tokens = newLine.split("=");
if (tokens.length != 2) {
System.err.println("ZK SSL config file format is incorrect: " + newLine);
System.err.println("ZK SSL config file content example: zookeeper.client.secure=true");
return;
}
systemProperties.put(tokens[0], tokens[1]);
newLine = br.readLine();
}
}
// Verified all required ZK SSL configs are present
for (String requiredZKSSLProperty: REQUIRED_ZK_SSL_SYSTEM_PROPERTIES) {
if (!systemProperties.containsKey(requiredZKSSLProperty)) {
System.err.println("Missing required ZK SSL property: " + requiredZKSSLProperty);
return;
}
}
System.setProperties(systemProperties);

String valueSchemaStr = readFile(valueSchemaFile);
verifyValidSchema(valueSchemaStr);
Schema newValueSchema = Schema.parse(valueSchemaStr);

HelixSchemaAccessor schemaAccessor =
new HelixSchemaAccessor(ZkClientFactory.newZkClient(veniceZookeeperUrl), new HelixAdapterSerializer(), cluster);
if (schemaAccessor.getValueSchema(store, String.valueOf(valueSchemaId)) != null) {
System.err.println(
"Schema version " + valueSchemaId + " is already registered in ZK for store " + store + ", do nothing!");
return;
}

// Check backward compatibility?
List<SchemaEntry> allValueSchemas = schemaAccessor.getAllValueSchemas(store);
for (SchemaEntry schemaEntry: allValueSchemas) {
SchemaCompatibility.SchemaPairCompatibility backwardCompatibility =
SchemaCompatibility.checkReaderWriterCompatibility(newValueSchema, schemaEntry.getSchema());
if (!backwardCompatibility.getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
ZkClient zkClient = readZKConfigAndBuildZKClient(veniceZookeeperUrl, zkSSLFile);
try {
HelixSchemaAccessor schemaAccessor = new HelixSchemaAccessor(zkClient, new HelixAdapterSerializer(), cluster);
if (schemaAccessor.getValueSchema(store, String.valueOf(valueSchemaId)) != null) {
System.err.println(
"New value schema for store " + store + " is not backward compatible with a previous schema version "
+ schemaEntry.getId() + ". Abort.");
"Schema version " + valueSchemaId + " is already registered in ZK for store " + store + ", do nothing!");
return;
}
}

// Register it
schemaAccessor.addValueSchema(store, new SchemaEntry(valueSchemaId, newValueSchema));
// Check backward compatibility?
List<SchemaEntry> allValueSchemas = schemaAccessor.getAllValueSchemas(store);
for (SchemaEntry schemaEntry: allValueSchemas) {
SchemaCompatibility.SchemaPairCompatibility backwardCompatibility =
SchemaCompatibility.checkReaderWriterCompatibility(newValueSchema, schemaEntry.getSchema());
if (!backwardCompatibility.getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
System.err.println(
"New value schema for store " + store + " is not backward compatible with a previous schema version "
+ schemaEntry.getId() + ". Abort.");
return;
}
}

// Register it
schemaAccessor.addValueSchema(store, new SchemaEntry(valueSchemaId, newValueSchema));
} finally {
zkClient.close();
}
}

private static void applyDerivedSchemaToStore(CommandLine cmd) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,14 @@ public enum Arg {
"interval", "itv", true,
"monitor data recovery progress at seconds close to the number specified by the interval parameter until tasks are finished"
), DATETIME("datetime", "dtm", true, "Date and time stamp (YYYY-MM-DDTHH:MM:SS) in UTC time zone for data recovery"),
SKIP_LAST_STORE_CREATION(
"skip-last-store-creation", "slsc", true,
"Skip last round of store creation and the following schema manipulation"
), REPAIR("repair", "re", true, "Repair the store"),
GRAVEYARD_CLUSTERS(
"graveyard-clusters", "gc", true, "Clusters to scan store graveyard to retrieve metadata, eg. cluster-1,cluster-2"
), RECOVER_CLUSTER("recover-cluster", "rc", true, "Cluster to recover from"),
BACKUP_FOLDER("backup-folder", "bf", true, "Backup folder path"),
DEBUG("debug", "d", false, "Print debugging messages for execute-data-recovery");

private final String argName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import static com.linkedin.venice.Arg.ALLOW_STORE_MIGRATION;
import static com.linkedin.venice.Arg.AMPLIFICATION_FACTOR;
import static com.linkedin.venice.Arg.AUTO_SCHEMA_REGISTER_FOR_PUSHJOB_ENABLED;
import static com.linkedin.venice.Arg.BACKUP_FOLDER;
import static com.linkedin.venice.Arg.BACKUP_STRATEGY;
import static com.linkedin.venice.Arg.BACKUP_VERSION_RETENTION_DAY;
import static com.linkedin.venice.Arg.BATCH_GET_LIMIT;
Expand Down Expand Up @@ -35,6 +36,7 @@
import static com.linkedin.venice.Arg.FABRIC_B;
import static com.linkedin.venice.Arg.FORCE;
import static com.linkedin.venice.Arg.FUTURE_VERSION_ETL_ENABLED;
import static com.linkedin.venice.Arg.GRAVEYARD_CLUSTERS;
import static com.linkedin.venice.Arg.HYBRID_BUFFER_REPLAY_POLICY;
import static com.linkedin.venice.Arg.HYBRID_DATA_REPLICATION_POLICY;
import static com.linkedin.venice.Arg.HYBRID_OFFSET_LAG;
Expand Down Expand Up @@ -80,16 +82,19 @@
import static com.linkedin.venice.Arg.READ_COMPUTATION_ENABLED;
import static com.linkedin.venice.Arg.READ_QUOTA;
import static com.linkedin.venice.Arg.RECOVERY_COMMAND;
import static com.linkedin.venice.Arg.RECOVER_CLUSTER;
import static com.linkedin.venice.Arg.REGIONS_FILTER;
import static com.linkedin.venice.Arg.REGULAR_VERSION_ETL_ENABLED;
import static com.linkedin.venice.Arg.REMOVE_VIEW;
import static com.linkedin.venice.Arg.REPAIR;
import static com.linkedin.venice.Arg.REPLICATE_ALL_CONFIGS;
import static com.linkedin.venice.Arg.REPLICATION_FACTOR;
import static com.linkedin.venice.Arg.RETRY;
import static com.linkedin.venice.Arg.RMD_CHUNKING_ENABLED;
import static com.linkedin.venice.Arg.SERVER_KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND;
import static com.linkedin.venice.Arg.SERVER_URL;
import static com.linkedin.venice.Arg.SKIP_DIV;
import static com.linkedin.venice.Arg.SKIP_LAST_STORE_CREATION;
import static com.linkedin.venice.Arg.SOURCE_FABRIC;
import static com.linkedin.venice.Arg.STARTING_OFFSET;
import static com.linkedin.venice.Arg.START_DATE;
Expand Down Expand Up @@ -484,6 +489,16 @@ public enum Command {
CONFIGURE_STORE_VIEW(
"configure-store-view", "Configure store view of a certain store", new Arg[] { URL, CLUSTER, STORE, VIEW_NAME },
new Arg[] { VIEW_CLASS, VIEW_PARAMS, REMOVE_VIEW }
),

RECOVER_STORE_METADATA(
"recover-store-metadata", "Recover store metadata in EI",
new Arg[] { URL, STORE, VENICE_ZOOKEEPER_URL, KAFKA_BOOTSTRAP_SERVERS, GRAVEYARD_CLUSTERS },
new Arg[] { ZK_SSL_CONFIG_FILE, KAFKA_CONSUMER_CONFIG_FILE, RECOVER_CLUSTER, SKIP_LAST_STORE_CREATION, REPAIR }
),
BACKUP_STORE_METADATA_FROM_GRAVEYARD(
"backup-store-metadata-from-graveyard", "Backup store metadata from graveyard in EI",
new Arg[] { VENICE_ZOOKEEPER_URL, ZK_SSL_CONFIG_FILE, BACKUP_FOLDER }
);

private final String commandName;
Expand Down
Loading

0 comments on commit 7e43d93

Please sign in to comment.