diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 4af73ca4a1..03fadb6530 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -242,7 +242,8 @@ public DaVinciBackend( true, // TODO: consider how/if a repair task would be valid for Davinci users? null, - pubSubClientsFactory); + pubSubClientsFactory, + Optional.empty()); ingestionService.start(); ingestionService.addIngestionNotifier(ingestionListener); diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java index 57c6b661ea..3645204618 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/config/VeniceServerConfig.java @@ -22,6 +22,7 @@ import static com.linkedin.venice.ConfigKeys.KAFKA_READ_ONLY_ADMIN_CLASS; import static com.linkedin.venice.ConfigKeys.KAFKA_WRITE_ONLY_ADMIN_CLASS; import static com.linkedin.venice.ConfigKeys.KEY_VALUE_PROFILING_ENABLED; +import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; import static com.linkedin.venice.ConfigKeys.LEADER_FOLLOWER_STATE_TRANSITION_THREAD_POOL_STRATEGY; import static com.linkedin.venice.ConfigKeys.LISTENER_HOSTNAME; import static com.linkedin.venice.ConfigKeys.LISTENER_PORT; @@ -396,6 +397,7 @@ public class VeniceServerConfig extends VeniceClusterConfig { private final boolean schemaPresenceCheckEnabled; private final boolean systemSchemaInitializationAtStartTimeEnabled; + private final boolean isKMERegistrationFromMessageHeaderEnabled; private final String localControllerUrl; private final String localControllerD2ServiceName; private final String localD2ZkHost; @@ -645,6 +647,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map topicLockManager; private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + private Optional sslFactory; + + private KafkaValueSerializer kafkaValueSerializer; public KafkaStoreIngestionService( StorageEngineRepository storageEngineRepository, @@ -227,7 +236,8 @@ public KafkaStoreIngestionService( Optional cacheBackend, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, - PubSubClientsFactory pubSubClientsFactory) { + PubSubClientsFactory pubSubClientsFactory, + Optional sslFactory) { this.cacheBackend = cacheBackend; this.storageMetadataService = storageMetadataService; this.metadataRepo = metadataRepo; @@ -239,6 +249,7 @@ public KafkaStoreIngestionService( this.compressorFactory = compressorFactory; // Each topic that has any partition ingested by this class has its own lock. this.topicLockManager = new ResourceAutoClosableLockManager<>(ReentrantLock::new); + this.sslFactory = sslFactory; customizedViewFuture.ifPresent(future -> future.thenApply(cv -> this.customizedViewRepository = cv)); helixInstanceFuture.ifPresent(future -> future.thenApply(helix -> this.helixInstanceConfigRepository = helix)); @@ -399,8 +410,40 @@ public void handleStoreDeleted(Store store) { + "may not be killed if admin helix messaging channel is disabled"); } - // TODO: Wire configs into these params - KafkaValueSerializer kafkaValueSerializer = new OptimizedKafkaValueSerializer(); + /** + * Register a callback function to handle the case when a new KME value schema is encountered when the server + * consumes messages from Kafka. + */ + BiConsumer newSchemaEncountered = (schemaId, schema) -> { + LOGGER.info("Encountered a new KME value schema (id = {}), proceed to register", schemaId); + try (ControllerClientBackedSystemSchemaInitializer schemaInitializer = + new ControllerClientBackedSystemSchemaInitializer( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE, + serverConfig.getSystemSchemaClusterName(), + null, + null, + false, + sslFactory, + serverConfig.getLocalControllerUrl(), + serverConfig.getLocalControllerD2ServiceName(), + serverConfig.getLocalD2ZkHost(), + false)) { + schemaInitializer.execute(ImmutableMap.of(schemaId, schema)); + } catch (VeniceException e) { + LOGGER.error( + "Exception in registering '{}' schema version '{}'", + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.name(), + schemaId, + e); + throw e; + } + }; + + // Don't apply newSchemaEncountered callbacks for da vinci client. + kafkaValueSerializer = (!isDaVinciClient && serverConfig.isKMERegistrationFromMessageHeaderEnabled()) + ? new OptimizedKafkaValueSerializer(newSchemaEncountered) + : new OptimizedKafkaValueSerializer(); + kafkaMessageEnvelopeSchemaReader.ifPresent(kafkaValueSerializer::setSchemaReader); PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer( kafkaValueSerializer, @@ -1358,4 +1401,9 @@ private void resetConsumptionOffset(VeniceStoreVersionConfig veniceStore, int pa } LOGGER.info("Offset reset to beginning - Kafka Partition: {}-{}.", topic, partitionId); } + + // For testing purpose only. + public KafkaValueSerializer getKafkaValueSerializer() { + return kafkaValueSerializer; + } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java index 407acd3457..33a569a8de 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionServiceTest.java @@ -167,7 +167,8 @@ public void testDisableMetricsEmission() { Optional.empty(), false, null, - mockPubSubClientsFactory); + mockPubSubClientsFactory, + Optional.empty()); String mockStoreName = "test"; String mockSimilarStoreName = "testTest"; @@ -250,7 +251,8 @@ public void testGetIngestingTopicsNotWithOnlineVersion() { Optional.empty(), false, null, - mockPubSubClientsFactory); + mockPubSubClientsFactory, + Optional.empty()); String topic1 = "test-store_v1"; String topic2 = "test-store_v2"; String invalidTopic = "invalid-store_v1"; @@ -337,7 +339,8 @@ public void testCloseStoreIngestionTask() { Optional.empty(), false, null, - mockPubSubClientsFactory); + mockPubSubClientsFactory, + Optional.empty()); String topicName = "test-store_v1"; String storeName = Version.parseStoreFromKafkaTopicName(topicName); Store mockStore = new ZKStore( @@ -401,7 +404,8 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest Optional.empty(), false, null, - mockPubSubClientsFactory); + mockPubSubClientsFactory, + Optional.empty()); String topicName = "test-store_v1"; String storeName = Version.parseStoreFromKafkaTopicName(topicName); Store mockStore = new ZKStore( @@ -463,7 +467,8 @@ public void testGetMetadata() { Optional.empty(), false, null, - mockPubSubClientsFactory); + mockPubSubClientsFactory, + Optional.empty()); String storeName = "test-store"; String otherStoreName = "test-store2"; Store mockStore = new ZKStore( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java index a320d73043..90c0f05d22 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java @@ -374,9 +374,11 @@ public SPECIFIC_RECORD deserialize(byte[] bytes, Schema providedProtocolSchema, VeniceSpecificDatumReader specificDatumReader = protocolVersionToReader.computeIfAbsent(protocolVersion, index -> { newSchemaEncountered.accept(protocolVersion, providedProtocolSchema); + + // Add a new datum reader for the protocol version. Notice that in that case that newSchemaEncountered is + // an empty lambda, the reader will still be added locally. return cacheDatumReader(protocolVersion, providedProtocolSchema); }); - return deserialize(bytes, specificDatumReader, reuse); } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java index 44d5b5d368..cf47fde9ed 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializer.java @@ -21,6 +21,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.RetryUtils; import com.linkedin.venice.utils.Utils; +import java.io.Closeable; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -32,7 +33,7 @@ import org.apache.logging.log4j.Logger; -public class ControllerClientBackedSystemSchemaInitializer { +public class ControllerClientBackedSystemSchemaInitializer implements Closeable { private static final Logger LOGGER = LogManager.getLogger(ControllerClientBackedSystemSchemaInitializer.class); private static final String DEFAULT_KEY_SCHEMA_STR = "\"int\""; /** @@ -347,6 +348,13 @@ private void checkAndMayRegisterPartialUpdateSchema( } } + @Override + public void close() { + if (controllerClient != null) { + controllerClient.close(); + } + } + // For testing void setControllerClient(ControllerClient controllerClient) { this.controllerClient = controllerClient; diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java index cfcaaff3e2..341b8bbfa2 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/system/store/ControllerClientBackedSystemSchemaInitializerTest.java @@ -24,29 +24,28 @@ public class ControllerClientBackedSystemSchemaInitializerTest { - ControllerClientBackedSystemSchemaInitializer initializer; - @Test public void testCreateSystemStoreAndRegisterSchema() { - try { - initializer = new ControllerClientBackedSystemSchemaInitializer( - AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE, - "testCluster", - null, - null, - false, - Optional.empty(), - "", - "", - "", - false); + try (ControllerClientBackedSystemSchemaInitializer initializer = new ControllerClientBackedSystemSchemaInitializer( + AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE, + "testCluster", + null, + null, + false, + Optional.empty(), + "", + "", + "", + false)) { initializer.execute(); Assert.fail("Exception should be thrown when neither controller url nor d2 config is provided"); } catch (VeniceException e) { // expected } - initializer = new ControllerClientBackedSystemSchemaInitializer( + + ControllerClient controllerClient = mock(ControllerClient.class); + try (ControllerClientBackedSystemSchemaInitializer initializer = new ControllerClientBackedSystemSchemaInitializer( AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE, "testCluster", null, @@ -56,52 +55,56 @@ public void testCreateSystemStoreAndRegisterSchema() { "", "d2Service", "d2ZkHost", - false); - ControllerClient controllerClient = mock(ControllerClient.class); - doReturn("leaderControllerUrl").when(controllerClient).getLeaderControllerUrl(); - D2ServiceDiscoveryResponse discoveryResponse = mock(D2ServiceDiscoveryResponse.class); - doReturn(true).when(discoveryResponse).isError(); - doReturn(ErrorType.STORE_NOT_FOUND).when(discoveryResponse).getErrorType(); - doReturn(discoveryResponse).when(controllerClient).discoverCluster(any()); - StoreResponse storeResponse = mock(StoreResponse.class); - doReturn(true).when(storeResponse).isError(); - doReturn(ErrorType.STORE_NOT_FOUND).when(storeResponse).getErrorType(); - doReturn(storeResponse).when(controllerClient).getStore(any()); - NewStoreResponse newStoreResponse = mock(NewStoreResponse.class); - doReturn(newStoreResponse).when(controllerClient).createNewSystemStore(any(), any(), any(), any()); - MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class); - doReturn(new MultiSchemaResponse.Schema[0]).when(multiSchemaResponse).getSchemas(); - doReturn(multiSchemaResponse).when(controllerClient).getAllValueSchema(any()); - SchemaResponse schemaResponse = mock(SchemaResponse.class); - doReturn(schemaResponse).when(controllerClient).addValueSchema(any(), any(), anyInt(), any()); - doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any(), any()); - doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any()); - initializer.setControllerClient(controllerClient); - initializer.execute(); - verify(controllerClient, times(1)).createNewSystemStore(any(), any(), any(), any()); - verify(controllerClient, times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion())) - .addValueSchema(any(), any(), anyInt(), any()); + false)) { + doReturn("leaderControllerUrl").when(controllerClient).getLeaderControllerUrl(); + D2ServiceDiscoveryResponse discoveryResponse = mock(D2ServiceDiscoveryResponse.class); + doReturn(true).when(discoveryResponse).isError(); + doReturn(ErrorType.STORE_NOT_FOUND).when(discoveryResponse).getErrorType(); + doReturn(discoveryResponse).when(controllerClient).discoverCluster(any()); + StoreResponse storeResponse = mock(StoreResponse.class); + doReturn(true).when(storeResponse).isError(); + doReturn(ErrorType.STORE_NOT_FOUND).when(storeResponse).getErrorType(); + doReturn(storeResponse).when(controllerClient).getStore(any()); + NewStoreResponse newStoreResponse = mock(NewStoreResponse.class); + doReturn(newStoreResponse).when(controllerClient).createNewSystemStore(any(), any(), any(), any()); + MultiSchemaResponse multiSchemaResponse = mock(MultiSchemaResponse.class); + doReturn(new MultiSchemaResponse.Schema[0]).when(multiSchemaResponse).getSchemas(); + doReturn(multiSchemaResponse).when(controllerClient).getAllValueSchema(any()); + SchemaResponse schemaResponse = mock(SchemaResponse.class); + doReturn(schemaResponse).when(controllerClient).addValueSchema(any(), any(), anyInt(), any()); + doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any(), any()); + doCallRealMethod().when(controllerClient).retryableRequest(anyInt(), any()); + initializer.setControllerClient(controllerClient); + initializer.execute(); + verify(controllerClient, times(1)).createNewSystemStore(any(), any(), any(), any()); + verify(controllerClient, times(AvroProtocolDefinition.METADATA_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion())) + .addValueSchema(any(), any(), anyInt(), any()); + } + verify(controllerClient, times(1)).close(); } @Test public void testSchemaCompatabilityType() { for (AvroProtocolDefinition protocol: AvroProtocolDefinition.values()) { - initializer = new ControllerClientBackedSystemSchemaInitializer( - protocol, - "testCluster", - null, - null, - false, - Optional.empty(), - "", - "", - "", - false); - if (protocol == AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE) { - Assert - .assertEquals(initializer.determineSchemaCompatabilityType(), DirectionalSchemaCompatibilityType.BACKWARD); - } else { - Assert.assertEquals(initializer.determineSchemaCompatabilityType(), DirectionalSchemaCompatibilityType.FULL); + try ( + ControllerClientBackedSystemSchemaInitializer initializer = new ControllerClientBackedSystemSchemaInitializer( + protocol, + "testCluster", + null, + null, + false, + Optional.empty(), + "", + "", + "", + false)) { + if (protocol == AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE) { + Assert.assertEquals( + initializer.determineSchemaCompatabilityType(), + DirectionalSchemaCompatibilityType.BACKWARD); + } else { + Assert.assertEquals(initializer.determineSchemaCompatabilityType(), DirectionalSchemaCompatibilityType.FULL); + } } } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java new file mode 100644 index 0000000000..7009d74ef8 --- /dev/null +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/helix/TestServerKMERegistrationFromMessageHeader.java @@ -0,0 +1,139 @@ +package com.linkedin.venice.helix; + +import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED; + +import com.linkedin.venice.compression.CompressionStrategy; +import com.linkedin.venice.controllerapi.ControllerClient; +import com.linkedin.venice.controllerapi.VersionCreationResponse; +import com.linkedin.venice.integration.utils.ServiceFactory; +import com.linkedin.venice.integration.utils.VeniceClusterWrapper; +import com.linkedin.venice.integration.utils.VeniceControllerWrapper; +import com.linkedin.venice.integration.utils.VeniceServerWrapper; +import com.linkedin.venice.meta.Version; +import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory; +import com.linkedin.venice.serialization.VeniceKafkaSerializer; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.KafkaValueSerializer; +import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; +import com.linkedin.venice.utils.IntegrationTestPushUtils; +import com.linkedin.venice.utils.TestUtils; +import com.linkedin.venice.utils.Utils; +import com.linkedin.venice.writer.VeniceWriter; +import com.linkedin.venice.writer.VeniceWriterOptions; +import java.util.HashMap; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class TestServerKMERegistrationFromMessageHeader { + private static final Logger LOGGER = LogManager.getLogger(TestServerKMERegistrationFromMessageHeader.class); + + private VeniceClusterWrapper cluster; + protected ControllerClient controllerClient; + int replicaFactor = 1; + int numOfController = 1; + int numOfRouters = 1; + int partitionSize = 1000; + private static final int TEST_TIMEOUT = 90_000; // ms + private String storeName; + private String storeVersion; + private static final String KEY_SCHEMA = "\"string\""; + private static final String VALUE_SCHEMA = "\"string\""; + private VeniceWriter veniceWriter; + private VeniceKafkaSerializer keySerializer; + private VeniceServerWrapper server; + private String clusterName; + + @BeforeClass + public void setUp() { + cluster = + ServiceFactory.getVeniceCluster(numOfController, 0, numOfRouters, replicaFactor, partitionSize, false, false); + clusterName = cluster.getClusterName(); + + Properties serverProperties = new Properties(); + Properties serverFeatureProperties = new Properties(); + serverProperties.put(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, true); + server = cluster.addVeniceServer(serverFeatureProperties, serverProperties); + } + + @AfterClass(alwaysRun = true) + public void cleanUp() { + Utils.closeQuietlyWithErrorLogged(cluster); + } + + @Test(timeOut = TEST_TIMEOUT) + public void testServerKMERegistrationFromMessageHeader() { + storeName = Utils.getUniqueString("venice-store"); + cluster.getNewStore(storeName, KEY_SCHEMA, VALUE_SCHEMA); + VersionCreationResponse creationResponse = cluster.getNewVersion(storeName, false); + + storeVersion = creationResponse.getKafkaTopic(); + int pushVersion = Version.parseVersionFromKafkaTopicName(storeVersion); + PubSubProducerAdapterFactory pubSubProducerAdapterFactory = + cluster.getPubSubBrokerWrapper().getPubSubClientsFactory().getProducerAdapterFactory(); + keySerializer = new VeniceAvroKafkaSerializer(KEY_SCHEMA); + + veniceWriter = + IntegrationTestPushUtils.getVeniceWriterFactory(cluster.getPubSubBrokerWrapper(), pubSubProducerAdapterFactory) + .createVeniceWriter(new VeniceWriterOptions.Builder(storeVersion).setKeySerializer(keySerializer).build()); + + VeniceControllerWrapper leaderController = cluster.getLeaderVeniceController(); + KafkaValueSerializer valueSerializer = + server.getVeniceServer().getKafkaStoreIngestionService().getKafkaValueSerializer(); + + HelixReadWriteSchemaRepositoryAdapter adapter = + (HelixReadWriteSchemaRepositoryAdapter) (leaderController.getVeniceHelixAdmin() + .getHelixVeniceClusterResources(clusterName) + .getSchemaRepository()); + HelixReadWriteSchemaRepository repo = + (HelixReadWriteSchemaRepository) adapter.getReadWriteRegularStoreSchemaRepository(); + + // Wait until the latest schema appears in child colo's schema repository (ZK). + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { + Assert.assertEquals( + repo.getValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()).getId(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); + }); + + // Remove the latest schema from child controller's local value serializer and remove it from child colo's schema + // repository (ZK). + repo.removeValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); + valueSerializer.removeAllSchemas(); + LOGGER.info("all schemas are removed"); + + /* + * Sending a start of segment control message will the latest schema in its header. + * Venice server, when it encounters the new schema in the message header and when KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED + * is enabled, registers the new schema into the child colo's schema repo as well as add to its local serializer. + */ + veniceWriter.broadcastStartOfPush(false, false, CompressionStrategy.NO_OP, new HashMap<>()); + veniceWriter.broadcastEndOfPush(new HashMap<>()); + + // Verify that new schema is registered in the child colo's schema repo. + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, true, true, () -> { + Assert.assertEquals( + repo.getValueSchema( + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()).getId(), + AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getCurrentProtocolVersion()); + }); + + // Verify that empty push can be finished successfully and a new version is created. + String controllerUrl = cluster.getAllControllersURLs(); + TestUtils.waitForNonDeterministicCompletion(30, TimeUnit.SECONDS, () -> { + int currentVersion = + ControllerClient.getStore(controllerUrl, cluster.getClusterName(), storeName).getStore().getCurrentVersion(); + return currentVersion == pushVersion; + }); + } +} diff --git a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java index bb9c417ffb..d5b4904eb6 100644 --- a/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java +++ b/services/venice-server/src/main/java/com/linkedin/venice/server/VeniceServer.java @@ -381,7 +381,8 @@ private List createServices() { Optional.empty(), false, remoteIngestionRepairService, - pubSubClientsFactory); + pubSubClientsFactory, + sslFactory); this.kafkaStoreIngestionService.addMetaSystemStoreReplicaStatusNotifier(); this.diskHealthCheckService = new DiskHealthCheckService(