Skip to content

Commit

Permalink
Trimmed down the copy-pasted code in the PubSubSecurityProtocol.
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixGV committed Jun 7, 2024
1 parent 4c01770 commit 610713b
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1116,7 +1116,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot
}
properties.putAll(sslConfig.get().getKafkaSSLConfig());
}
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name);
properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name());
return new VeniceProperties(properties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,13 @@
*/
public enum PubSubSecurityProtocol {
/** Un-authenticated, non-encrypted channel */
PLAINTEXT(0, "PLAINTEXT"),
PLAINTEXT,
/** SSL channel */
SSL(1, "SSL"),
SSL,
/** SASL authenticated, non-encrypted channel */
SASL_PLAINTEXT(2, "SASL_PLAINTEXT"),
SASL_PLAINTEXT,
/** SASL authenticated, SSL channel */
SASL_SSL(3, "SASL_SSL");

/** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */
public final short id;

/** Name of the security protocol. This may be used by client configuration. */
public final String name;

PubSubSecurityProtocol(int id, String name) {
this.id = (short) id;
this.name = name;
}
SASL_SSL;

public static PubSubSecurityProtocol forName(String name) {
return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol)
properties.put("kafka.bootstrap.servers", "localhost:9092");
properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG);
properties.put("kafka.sasl.mechanism", SASL_MECHANISM);
properties.put("kafka.security.protocol", securityProtocol.name);
if (securityProtocol.name.contains("SSL")) {
properties.put("kafka.security.protocol", securityProtocol.name());
if (securityProtocol.name().contains("SSL")) {
properties.put("ssl.truststore.location", "-");
properties.put("ssl.truststore.password", "");
properties.put("ssl.truststore.type", "JKS");
Expand All @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol)
Properties adminProperties = serverConfig.getAdminProperties();
assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config"));
assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism"));
assertEquals(securityProtocol.name, adminProperties.get("security.protocol"));
assertEquals(securityProtocol.name(), adminProperties.get("security.protocol"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,23 @@ public void testInstantiation() {
assertEquals(specificEnum.name(), value.name());
}
}

@Test
public void testForName() {
assertEquals(PubSubSecurityProtocol.forName("plaintext"), PubSubSecurityProtocol.PLAINTEXT);
assertEquals(PubSubSecurityProtocol.forName("PLAINTEXT"), PubSubSecurityProtocol.PLAINTEXT);
assertEquals(PubSubSecurityProtocol.forName("Plaintext"), PubSubSecurityProtocol.PLAINTEXT);

assertEquals(PubSubSecurityProtocol.forName("ssl"), PubSubSecurityProtocol.SSL);
assertEquals(PubSubSecurityProtocol.forName("SSL"), PubSubSecurityProtocol.SSL);
assertEquals(PubSubSecurityProtocol.forName("Ssl"), PubSubSecurityProtocol.SSL);

assertEquals(PubSubSecurityProtocol.forName("sasl_plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT);
assertEquals(PubSubSecurityProtocol.forName("SASL_PLAINTEXT"), PubSubSecurityProtocol.SASL_PLAINTEXT);
assertEquals(PubSubSecurityProtocol.forName("Sasl_Plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT);

assertEquals(PubSubSecurityProtocol.forName("sasl_ssl"), PubSubSecurityProtocol.SASL_SSL);
assertEquals(PubSubSecurityProtocol.forName("SASL_SSL"), PubSubSecurityProtocol.SASL_SSL);
assertEquals(PubSubSecurityProtocol.forName("Sasl_Ssl"), PubSubSecurityProtocol.SASL_SSL);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ static StatefulServiceProvider<VeniceControllerWrapper> generateService(VeniceCo
}

if (options.isSslToKafka()) {
builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name);
builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name());
builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ static StatefulServiceProvider<VeniceServerWrapper> generateService(
.put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000)
.put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000);
if (sslToKafka) {
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name);
serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name());
serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,8 @@ private static Map<String, Map<String, String>> addKafkaClusterIDMappingToServer
List<String> regionNames,
List<PubSubBrokerWrapper> kafkaBrokers) {
if (serverProperties.isPresent()) {
PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol
.valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name));
PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol.valueOf(
serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name()));
Map<String, Map<String, String>> kafkaClusterMap = new HashMap<>();

Map<String, String> mapping;
Expand All @@ -302,7 +302,7 @@ private static Map<String, Map<String, String>> addKafkaClusterIDMappingToServer
// Testing mixed security on any 2-layer setup with 2 or more DCs.
securityProtocol = PubSubSecurityProtocol.SSL;
}
mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name);
mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name());

// N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this
// is why the index here is offset by 1 compared to the cluster ID.
Expand Down

0 comments on commit 610713b

Please sign in to comment.