From 610713b8ab4fd9725440fb8b0d4df3fda3fd7581 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Fri, 7 Jun 2024 09:46:48 -0700 Subject: [PATCH] Trimmed down the copy-pasted code in the PubSubSecurityProtocol. --- .../consumer/KafkaStoreIngestionService.java | 2 +- .../pubsub/api/PubSubSecurityProtocol.java | 19 ++++--------------- .../admin/ApacheKafkaAdminConfigTest.java | 6 +++--- .../api/PubSubSecurityProtocolTest.java | 19 +++++++++++++++++++ .../utils/VeniceControllerWrapper.java | 2 +- .../utils/VeniceServerWrapper.java | 2 +- ...woLayerMultiRegionMultiClusterWrapper.java | 6 +++--- 7 files changed, 32 insertions(+), 24 deletions(-) diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java index 7a4dd4cb65a..108dadc2414 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java @@ -1116,7 +1116,7 @@ private VeniceProperties getPubSubSSLPropertiesFromServerConfig(String kafkaBoot } properties.putAll(sslConfig.get().getKafkaSSLConfig()); } - properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name); + properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol.name()); return new VeniceProperties(properties); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java index 05f0e5d13b2..b09d63dabca 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocol.java @@ -11,24 +11,13 @@ */ public enum PubSubSecurityProtocol { /** Un-authenticated, non-encrypted channel */ - PLAINTEXT(0, "PLAINTEXT"), + PLAINTEXT, /** SSL channel */ - SSL(1, "SSL"), + SSL, /** SASL authenticated, non-encrypted channel */ - SASL_PLAINTEXT(2, "SASL_PLAINTEXT"), + SASL_PLAINTEXT, /** SASL authenticated, SSL channel */ - SASL_SSL(3, "SASL_SSL"); - - /** The permanent and immutable id of a security protocol -- this can't change, and must match kafka.cluster.SecurityProtocol */ - public final short id; - - /** Name of the security protocol. This may be used by client configuration. */ - public final String name; - - PubSubSecurityProtocol(int id, String name) { - this.id = (short) id; - this.name = name; - } + SASL_SSL; public static PubSubSecurityProtocol forName(String name) { return PubSubSecurityProtocol.valueOf(name.toUpperCase(Locale.ROOT)); diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java index 5091e0fd019..1f2a998d7ba 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/adapter/kafka/admin/ApacheKafkaAdminConfigTest.java @@ -35,8 +35,8 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) properties.put("kafka.bootstrap.servers", "localhost:9092"); properties.put("kafka.sasl.jaas.config", SASL_JAAS_CONFIG); properties.put("kafka.sasl.mechanism", SASL_MECHANISM); - properties.put("kafka.security.protocol", securityProtocol.name); - if (securityProtocol.name.contains("SSL")) { + properties.put("kafka.security.protocol", securityProtocol.name()); + if (securityProtocol.name().contains("SSL")) { properties.put("ssl.truststore.location", "-"); properties.put("ssl.truststore.password", ""); properties.put("ssl.truststore.type", "JKS"); @@ -49,7 +49,7 @@ private void testSetupSaslInKafkaAdmin(PubSubSecurityProtocol securityProtocol) Properties adminProperties = serverConfig.getAdminProperties(); assertEquals(SASL_JAAS_CONFIG, adminProperties.get("sasl.jaas.config")); assertEquals(SASL_MECHANISM, adminProperties.get("sasl.mechanism")); - assertEquals(securityProtocol.name, adminProperties.get("security.protocol")); + assertEquals(securityProtocol.name(), adminProperties.get("security.protocol")); } @Test diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java index 75029fa2739..5edbd018e28 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubSecurityProtocolTest.java @@ -64,4 +64,23 @@ public void testInstantiation() { assertEquals(specificEnum.name(), value.name()); } } + + @Test + public void testForName() { + assertEquals(PubSubSecurityProtocol.forName("plaintext"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("PLAINTEXT"), PubSubSecurityProtocol.PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Plaintext"), PubSubSecurityProtocol.PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("ssl"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("SSL"), PubSubSecurityProtocol.SSL); + assertEquals(PubSubSecurityProtocol.forName("Ssl"), PubSubSecurityProtocol.SSL); + + assertEquals(PubSubSecurityProtocol.forName("sasl_plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("SASL_PLAINTEXT"), PubSubSecurityProtocol.SASL_PLAINTEXT); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Plaintext"), PubSubSecurityProtocol.SASL_PLAINTEXT); + + assertEquals(PubSubSecurityProtocol.forName("sasl_ssl"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("SASL_SSL"), PubSubSecurityProtocol.SASL_SSL); + assertEquals(PubSubSecurityProtocol.forName("Sasl_Ssl"), PubSubSecurityProtocol.SASL_SSL); + } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java index 9cf85442128..f79e7b643e9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceControllerWrapper.java @@ -228,7 +228,7 @@ static StatefulServiceProvider generateService(VeniceCo } if (options.isSslToKafka()) { - builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); + builder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); builder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java index 04fa96c93cd..95f9e8d5fb8 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceServerWrapper.java @@ -259,7 +259,7 @@ static StatefulServiceProvider generateService( .put(SERVER_INGESTION_HEARTBEAT_INTERVAL_MS, 5000) .put(SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS, 5000); if (sslToKafka) { - serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name); + serverPropsBuilder.put(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.SSL.name()); serverPropsBuilder.put(KafkaTestUtils.getLocalCommonKafkaSSLConfig(SslUtils.getTlsConfiguration())); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java index 492aa495661..45b1683b865 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/integration/utils/VeniceTwoLayerMultiRegionMultiClusterWrapper.java @@ -288,8 +288,8 @@ private static Map> addKafkaClusterIDMappingToServer List regionNames, List kafkaBrokers) { if (serverProperties.isPresent()) { - PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol - .valueOf(serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name)); + PubSubSecurityProtocol baseSecurityProtocol = PubSubSecurityProtocol.valueOf( + serverProperties.get().getProperty(KAFKA_SECURITY_PROTOCOL, PubSubSecurityProtocol.PLAINTEXT.name())); Map> kafkaClusterMap = new HashMap<>(); Map mapping; @@ -302,7 +302,7 @@ private static Map> addKafkaClusterIDMappingToServer // Testing mixed security on any 2-layer setup with 2 or more DCs. securityProtocol = PubSubSecurityProtocol.SSL; } - mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name); + mapping.put(KAFKA_CLUSTER_MAP_SECURITY_PROTOCOL, securityProtocol.name()); // N.B. the first Kafka broker in the list is the parent, which we're excluding from the mapping, so this // is why the index here is offset by 1 compared to the cluster ID.