Skip to content

Commit

Permalink
[controller] Refactor HelixAdminClient APIs to accept strongly typed …
Browse files Browse the repository at this point in the history
…Helix objects (#1286)

Currently, HelixAdminClient APIs accept a String-to-String map to set on the cluster configs. Changed to use strongly typed Helix "ClusterConfig" object instead.

Also, moved the "CloudConfig" creation at time of parsing configs, and reusing the created object instead of creating it at time of cluster creation.
  • Loading branch information
nisargthakkar authored Nov 6, 2024
1 parent f1d16b2 commit ae6f5d1
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.PropertyKey;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.InstanceConfig;
Expand Down Expand Up @@ -381,4 +384,27 @@ public static void setupCustomizedStateConfig(HelixAdmin admin, String clusterNa
public static String instanceIdToUrl(String instanceId) {
return "https://" + instanceId.replace("_", ":");
}

public static CloudConfig getCloudConfig(
CloudProvider cloudProvider,
String cloudId,
List<String> cloudInfoSources,
String cloudInfoProcessorName) {
CloudConfig.Builder cloudConfigBuilder =
new CloudConfig.Builder().setCloudEnabled(true).setCloudProvider(cloudProvider);

if (!StringUtils.isEmpty(cloudId)) {
cloudConfigBuilder.setCloudID(cloudId);
}

if (cloudInfoSources != null && !cloudInfoSources.isEmpty()) {
cloudConfigBuilder.setCloudInfoSources(cloudInfoSources);
}

if (!StringUtils.isEmpty(cloudInfoProcessorName)) {
cloudConfigBuilder.setCloudInfoProcessorName(cloudInfoProcessorName);
}

return cloudConfigBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
package com.linkedin.venice.utils;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;

import com.linkedin.venice.meta.Instance;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.helix.HelixException;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.model.CloudConfig;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -16,4 +26,37 @@ public void parsesHostnameFromInstanceName() {
Assert.assertEquals(instance2.getHost(), "host_name");
Assert.assertEquals(instance2.getPort(), 5678);
}

@Test
public void testGetCloudConfig() {
List<String> cloudInfoSources = new ArrayList<>();
cloudInfoSources.add("TestSource");

CloudConfig cloudConfig =
HelixUtils.getCloudConfig(CloudProvider.CUSTOMIZED, "NA", cloudInfoSources, "TestProcessor");

assertTrue(cloudConfig.isCloudEnabled());
assertEquals(cloudConfig.getCloudProvider(), "CUSTOMIZED");
assertEquals(cloudConfig.getCloudID(), "NA");
assertEquals(cloudConfig.getCloudInfoSources().size(), 1);
assertEquals(cloudConfig.getCloudInfoSources().get(0), "TestSource");
assertEquals(cloudConfig.getCloudInfoProcessorName(), "TestProcessor");
}

@Test
public void testGetCloudConfigWhenControllerCloudInfoSourcesNotSet() {
assertThrows(
HelixException.class,
() -> HelixUtils.getCloudConfig(CloudProvider.CUSTOMIZED, "NA", Collections.emptyList(), "TestProcessor"));
}

@Test
public void testGetCloudConfigWhenControllerCloudInfoProcessorNameNotSet() {
List<String> cloudInfoSources = new ArrayList<>();
cloudInfoSources.add("TestSource");

assertThrows(
HelixException.class,
() -> HelixUtils.getCloudConfig(CloudProvider.CUSTOMIZED, "NA", cloudInfoSources, ""));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
import com.linkedin.venice.utils.DaemonThreadFactory;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.Time;
import com.linkedin.venice.utils.Utils;
import io.tehuti.metrics.MetricsRepository;
Expand All @@ -42,6 +43,8 @@
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.model.CloudConfig;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -268,8 +271,8 @@ public Void call() {
client.createVeniceControllerCluster();
client.addClusterToGrandCluster("venice-controllers");
for (int i = 0; i < 10; i++) {
String clusterName = "cluster-" + String.valueOf(i);
client.createVeniceStorageCluster(clusterName, new HashMap<>());
String clusterName = "cluster-" + i;
client.createVeniceStorageCluster(clusterName, new ClusterConfig(clusterName));
client.addClusterToGrandCluster(clusterName);
client.addVeniceStorageClusterToControllerCluster(clusterName);
}
Expand Down Expand Up @@ -330,17 +333,15 @@ public void testCloudConfig() {
HelixAsAServiceWrapper helixAsAServiceWrapper = startAndWaitForHAASToBeAvailable(zk.getAddress())) {
VeniceControllerClusterConfig commonConfig = mock(VeniceControllerClusterConfig.class);
VeniceControllerMultiClusterConfig controllerMultiClusterConfig = mock(VeniceControllerMultiClusterConfig.class);
CloudProvider cloudProvider = CloudProvider.CUSTOMIZED;

List<String> cloudInfoSources = new ArrayList<>();
cloudInfoSources.add("TestSource");

when(commonConfig.isControllerClusterHelixCloudEnabled()).thenReturn(true);
when(commonConfig.isStorageClusterHelixCloudEnabled()).thenReturn(true);
when(commonConfig.getHelixCloudProvider()).thenReturn(cloudProvider);
when(commonConfig.getHelixCloudId()).thenReturn("NA");
when(commonConfig.getHelixCloudInfoSources()).thenReturn(cloudInfoSources);
when(commonConfig.getHelixCloudInfoProcessorName()).thenReturn("TestProcessor");
CloudConfig cloudConfig =
HelixUtils.getCloudConfig(CloudProvider.CUSTOMIZED, "NA", cloudInfoSources, "TestProcessor");
when(commonConfig.getHelixCloudConfig()).thenReturn(cloudConfig);

doReturn(helixAsAServiceWrapper.getZkAddress()).when(controllerMultiClusterConfig).getZkAddress();
doReturn(HelixAsAServiceWrapper.HELIX_SUPER_CLUSTER_NAME).when(controllerMultiClusterConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;
import java.util.Map;
import org.apache.helix.model.ClusterConfig;


/**
Expand Down Expand Up @@ -29,9 +30,9 @@ public interface HelixAdminClient {
/**
* Create and configure the Venice storage cluster.
* @param clusterName of the Venice storage cluster.
* @param helixClusterProperties to be applied to the new cluster.
* @param clusterConfig {@link ClusterConfig} for the new cluster.
*/
void createVeniceStorageCluster(String clusterName, Map<String, String> helixClusterProperties);
void createVeniceStorageCluster(String clusterName, ClusterConfig clusterConfig);

/**
* Check if the given Venice storage cluster's cluster resource is in the Venice controller cluster.
Expand Down Expand Up @@ -62,9 +63,9 @@ public interface HelixAdminClient {
/**
* Update some Helix cluster properties for the given cluster.
* @param clusterName of the cluster to be updated.
* @param helixClusterProperties to be applied to the given cluster.
* @param clusterConfig {@link ClusterConfig} for the new cluster.
*/
void updateClusterConfigs(String clusterName, Map<String, String> helixClusterProperties);
void updateClusterConfigs(String clusterName, ClusterConfig clusterConfig);

/**
* Disable or enable a list of partitions on an instance.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@
import static com.linkedin.venice.ConfigKeys.PUBSUB_TOPIC_MANAGER_METADATA_FETCHER_THREAD_POOL_SIZE;
import static com.linkedin.venice.ConfigKeys.PUSH_JOB_FAILURE_CHECKPOINTS_TO_DEFINE_USER_ERROR;
import static com.linkedin.venice.ConfigKeys.PUSH_JOB_STATUS_STORE_CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.PUSH_MONITOR_TYPE;
import static com.linkedin.venice.ConfigKeys.PUSH_SSL_ALLOWLIST;
import static com.linkedin.venice.ConfigKeys.PUSH_SSL_WHITELIST;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED;
Expand Down Expand Up @@ -190,8 +189,8 @@
import com.linkedin.venice.pubsub.PubSubAdminAdapterFactory;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pushmonitor.LeakedPushStatusCleanUpService;
import com.linkedin.venice.pushmonitor.PushMonitorType;
import com.linkedin.venice.status.BatchJobHeartbeatConfigs;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.RegionUtils;
import com.linkedin.venice.utils.Time;
Expand All @@ -212,6 +211,7 @@
import java.util.stream.Collectors;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.model.CloudConfig;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -366,10 +366,7 @@ public class VeniceControllerClusterConfig {

private final boolean controllerClusterHelixCloudEnabled;
private final boolean storageClusterHelixCloudEnabled;
private final CloudProvider helixCloudProvider;
private final String helixCloudId;
private final List<String> helixCloudInfoSources;
private final String helixCloudInfoProcessorName;
private final CloudConfig helixCloudConfig;

private final boolean usePushStatusStoreForIncrementalPushStatusReads;

Expand Down Expand Up @@ -430,7 +427,6 @@ public class VeniceControllerClusterConfig {
private final boolean sslToKafka;
private final int helixSendMessageTimeoutMilliseconds;
private final int adminTopicReplicationFactor;
private final PushMonitorType pushMonitorType;

private final String kafkaSecurityProtocol;
// SSL related config
Expand Down Expand Up @@ -629,8 +625,6 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.pushSSLAllowlist = props.getListWithAlternative(PUSH_SSL_ALLOWLIST, PUSH_SSL_WHITELIST, new ArrayList<>());
this.helixRebalanceAlg = props.getString(HELIX_REBALANCE_ALG, CrushRebalanceStrategy.class.getName());
this.adminTopicReplicationFactor = props.getInt(ADMIN_TOPIC_REPLICATION_FACTOR, 3);
this.pushMonitorType =
PushMonitorType.valueOf(props.getString(PUSH_MONITOR_TYPE, PushMonitorType.WRITE_COMPUTE_STORE.name()));
if (adminTopicReplicationFactor < 1) {
throw new ConfigurationException(ADMIN_TOPIC_REPLICATION_FACTOR + " cannot be less than 1.");
}
Expand Down Expand Up @@ -910,25 +904,30 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.storageClusterHelixCloudEnabled = props.getBoolean(CONTROLLER_STORAGE_CLUSTER_HELIX_CLOUD_ENABLED, false);

if (controllerClusterHelixCloudEnabled || storageClusterHelixCloudEnabled) {
CloudProvider helixCloudProvider;
String controllerCloudProvider = props.getString(CONTROLLER_HELIX_CLOUD_PROVIDER).toUpperCase();
try {
this.helixCloudProvider = CloudProvider.valueOf(controllerCloudProvider);
helixCloudProvider = CloudProvider.valueOf(controllerCloudProvider);
} catch (IllegalArgumentException e) {
throw new VeniceException(
"Invalid Helix cloud provider: " + controllerCloudProvider + ". Must be one of: "
+ Arrays.toString(CloudProvider.values()));
}
} else {
this.helixCloudProvider = null;
}

this.helixCloudId = props.getString(CONTROLLER_HELIX_CLOUD_ID, "");
this.helixCloudInfoProcessorName = props.getString(CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_NAME, "");
String helixCloudId = props.getString(CONTROLLER_HELIX_CLOUD_ID, "");
String helixCloudInfoProcessorName = props.getString(CONTROLLER_HELIX_CLOUD_INFO_PROCESSOR_NAME, "");

List<String> helixCloudInfoSources;
if (props.getString(CONTROLLER_HELIX_CLOUD_INFO_SOURCES, "").isEmpty()) {
helixCloudInfoSources = Collections.emptyList();
} else {
helixCloudInfoSources = props.getList(CONTROLLER_HELIX_CLOUD_INFO_SOURCES);
}

if (props.getString(CONTROLLER_HELIX_CLOUD_INFO_SOURCES, "").isEmpty()) {
this.helixCloudInfoSources = Collections.emptyList();
helixCloudConfig = HelixUtils
.getCloudConfig(helixCloudProvider, helixCloudId, helixCloudInfoSources, helixCloudInfoProcessorName);
} else {
this.helixCloudInfoSources = props.getList(CONTROLLER_HELIX_CLOUD_INFO_SOURCES);
helixCloudConfig = null;
}

this.unregisterMetricForDeletedStoreEnabled = props.getBoolean(UNREGISTER_METRIC_FOR_DELETED_STORE_ENABLED, false);
Expand Down Expand Up @@ -1577,20 +1576,8 @@ public boolean isStorageClusterHelixCloudEnabled() {
return storageClusterHelixCloudEnabled;
}

public CloudProvider getHelixCloudProvider() {
return helixCloudProvider;
}

public String getHelixCloudId() {
return helixCloudId;
}

public List<String> getHelixCloudInfoSources() {
return helixCloudInfoSources;
}

public String getHelixCloudInfoProcessorName() {
return helixCloudInfoProcessorName;
public CloudConfig getHelixCloudConfig() {
return helixCloudConfig;
}

public boolean usePushStatusStoreForIncrementalPush() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6284,24 +6284,21 @@ private void createControllerClusterIfRequired() {

private void setupStorageClusterAsNeeded(String clusterName) {
if (!helixAdminClient.isVeniceStorageClusterCreated(clusterName)) {
Map<String, String> helixClusterProperties = new HashMap<>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
long delayRebalanceTimeMs = multiClusterConfigs.getControllerConfig(clusterName).getDelayToRebalanceMS();
ClusterConfig helixClusterConfig = new ClusterConfig(clusterName);
helixClusterConfig.getRecord().setBooleanField(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, true);
VeniceControllerClusterConfig clusterConfigs = multiClusterConfigs.getControllerConfig(clusterName);
long delayRebalanceTimeMs = clusterConfigs.getDelayToRebalanceMS();
if (delayRebalanceTimeMs > 0) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_ENABLED.name(), String.valueOf(true));
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayRebalanceTimeMs));
helixClusterConfig.setRebalanceDelayTime(delayRebalanceTimeMs);
helixClusterConfig.setDelayRebalaceEnabled(true);
}
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), String.valueOf(true));
helixClusterConfig.setPersistBestPossibleAssignment(true);
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.FAULT_ZONE_TYPE.name(), HelixUtils.TOPOLOGY_CONSTRAINT);
helixAdminClient.createVeniceStorageCluster(clusterName, helixClusterProperties);
helixClusterConfig.setTopology("/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterConfig.setFaultZoneType(HelixUtils.TOPOLOGY_CONSTRAINT);

helixAdminClient.createVeniceStorageCluster(clusterName, helixClusterConfig);
}
if (!helixAdminClient.isClusterInGrandCluster(clusterName)) {
helixAdminClient.addClusterToGrandCluster(clusterName);
Expand Down Expand Up @@ -6334,8 +6331,8 @@ private void createClusterIfRequired(String clusterName) {
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.DELAY_REBALANCE_TIME.name(), String.valueOf(delayedTime));
}
// Topology and fault zone type fields are used by CRUSH alg. Helix would apply the constrains on CRUSH alg to
// choose proper instance to hold the replica.
// Topology and fault zone type fields are used by CRUSH/CRUSHED/WAGED/etc alg. Helix would apply the constrains on
// these alg to choose proper instance to hold the replica.
helixClusterProperties
.put(ClusterConfig.ClusterConfigProperty.TOPOLOGY.name(), "/" + HelixUtils.TOPOLOGY_CONSTRAINT);
helixClusterProperties
Expand Down
Loading

0 comments on commit ae6f5d1

Please sign in to comment.