diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java index fed25c5408..c2dd46adc9 100755 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java @@ -28,6 +28,7 @@ import com.linkedin.venice.controllerapi.D2ControllerClientFactory; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; import com.linkedin.venice.controllerapi.MultiSchemaResponse; +import com.linkedin.venice.controllerapi.RepushInfo; import com.linkedin.venice.controllerapi.RepushInfoResponse; import com.linkedin.venice.controllerapi.SchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; @@ -168,6 +169,9 @@ public class VenicePushJob implements AutoCloseable { public static final String SEND_CONTROL_MESSAGES_DIRECTLY = "send.control.messages.directly"; public static final String SOURCE_ETL = "source.etl"; public static final String ETL_VALUE_SCHEMA_TRANSFORMATION = "etl.value.schema.transformation"; + public static final String SYSTEM_SCHEMA_READER_ENABLED = "system.schema.reader.enabled"; + public static final String SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME = "system.schema.cluster.d2.service.name"; + public static final String SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST = "system.schema.cluster.d2.zk.host"; /** * Config to enable/disable the feature to collect extra metrics wrt compression. @@ -539,6 +543,9 @@ protected static class PushJobSetting { String targetedRegions; boolean isTargetedRegionPushEnabled; boolean postValidationConsumption; + boolean isSystemSchemaReaderEnabled; + String systemSchemaClusterD2ServiceName; + String systemSchemaClusterD2ZKHost; } protected PushJobSetting pushJobSetting; @@ -715,6 +722,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { pushJobSettingToReturn.repushTTLStartTimeMs = props.getLong(REPUSH_TTL_START_TIMESTAMP, System.currentTimeMillis()); pushJobSettingToReturn.isTargetedRegionPushEnabled = props.getBoolean(TARGETED_REGION_PUSH_ENABLED, false); pushJobSettingToReturn.postValidationConsumption = props.getBoolean(POST_VALIDATION_CONSUMPTION_ENABLED, true); + pushJobSettingToReturn.isSystemSchemaReaderEnabled = props.getBoolean(SYSTEM_SCHEMA_READER_ENABLED, false); if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) { throw new VeniceException("Incremental push is not supported while using targeted region push mode"); } @@ -834,9 +842,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) { * @param properties properties * @return Topic name */ - private String getSourceTopicNameForKafkaInput( - final String userProvidedStoreName, - final VeniceProperties properties) { + String getSourceTopicNameForKafkaInput(final String userProvidedStoreName, final VeniceProperties properties) { final Optional userProvidedTopicNameOptional = Optional.ofNullable(properties.getString(KAFKA_INPUT_TOPIC, () -> null)); @@ -1813,9 +1819,19 @@ private void initControllerClient(String storeName, Optional sslFact protected void initKIFRepushDetails() { pushJobSetting.kafkaInputTopic = getSourceTopicNameForKafkaInput(pushJobSetting.storeName, props); - pushJobSetting.kafkaInputBrokerUrl = pushJobSetting.repushInfoResponse == null - ? props.getString(KAFKA_INPUT_BROKER_URL) - : pushJobSetting.repushInfoResponse.getRepushInfo().getKafkaBrokerUrl(); + if (pushJobSetting.repushInfoResponse == null) { + pushJobSetting.kafkaInputBrokerUrl = props.getString(KAFKA_INPUT_BROKER_URL); + } else { + RepushInfo repushInfo = pushJobSetting.repushInfoResponse.getRepushInfo(); + pushJobSetting.kafkaInputBrokerUrl = repushInfo.getKafkaBrokerUrl(); + pushJobSetting.systemSchemaClusterD2ServiceName = repushInfo.getSystemSchemaClusterD2ServiceName(); + pushJobSetting.systemSchemaClusterD2ZKHost = repushInfo.getSystemSchemaClusterD2ZkHost(); + } + if (pushJobSetting.isSystemSchemaReaderEnabled + && (StringUtils.isEmpty(pushJobSetting.systemSchemaClusterD2ServiceName) + || StringUtils.isEmpty(pushJobSetting.systemSchemaClusterD2ZKHost))) { + throw new VeniceException("D2 service name and zk host must be provided when system schema reader is enabled"); + } } private ControllerClient getControllerClient( @@ -3015,6 +3031,12 @@ protected void setupDefaultJobConf( KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED, Boolean.toString(storeSetting.sourceKafkaInputVersionInfo.isChunkingEnabled())); + conf.setBoolean(SYSTEM_SCHEMA_READER_ENABLED, pushJobSetting.isSystemSchemaReaderEnabled); + if (pushJobSetting.isSystemSchemaReaderEnabled) { + conf.set(SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME, pushJobSetting.systemSchemaClusterD2ServiceName); + conf.set(SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST, pushJobSetting.systemSchemaClusterD2ZKHost); + conf.set(SSL_FACTORY_CLASS_NAME, props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME)); + } } else { conf.setInt(VALUE_SCHEMA_ID_PROP, pushJobSchemaInfo.getValueSchemaId()); conf.setInt(DERIVED_SCHEMA_ID_PROP, pushJobSchemaInfo.getDerivedSchemaId()); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer.java index 2795538e07..c27d881e2e 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputDictTrainer.java @@ -15,7 +15,6 @@ import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory; import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter; import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; -import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.VeniceProperties; import com.linkedin.venice.utils.pools.LandFillObjectPool; @@ -208,7 +207,7 @@ synchronized byte[] trainDict(Optional reusedConsumerOpti KafkaInputUtils.getConsumerProperties(jobConf), false, new PubSubMessageDeserializer( - new OptimizedKafkaValueSerializer(), + KafkaInputUtils.getKafkaValueSerializer(jobConf), new LandFillObjectPool<>(KafkaMessageEnvelope::new), new LandFillObjectPool<>(KafkaMessageEnvelope::new)), null)); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java index 5d56a68612..40a3afa24a 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputRecordReader.java @@ -22,7 +22,6 @@ import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer; import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; -import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; import com.linkedin.venice.utils.Utils; import com.linkedin.venice.utils.pools.LandFillObjectPool; import java.io.IOException; @@ -100,7 +99,7 @@ public KafkaInputRecordReader(InputSplit split, JobConf job, Reporter reporter) KafkaInputUtils.getConsumerProperties(job), false, new PubSubMessageDeserializer( - new OptimizedKafkaValueSerializer(), + KafkaInputUtils.getKafkaValueSerializer(job), new LandFillObjectPool<>(KafkaMessageEnvelope::new), new LandFillObjectPool<>(KafkaMessageEnvelope::new)), null), diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputUtils.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputUtils.java index 567633b659..0b14d9664b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputUtils.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/input/kafka/KafkaInputUtils.java @@ -1,9 +1,18 @@ package com.linkedin.venice.hadoop.input.kafka; +import static com.linkedin.venice.CommonConfigKeys.SSL_FACTORY_CLASS_NAME; import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_BROKER_URL; import static com.linkedin.venice.hadoop.VenicePushJob.SSL_CONFIGURATOR_CLASS_CONFIG; +import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME; +import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST; +import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_READER_ENABLED; +import com.linkedin.d2.balancer.D2Client; +import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.venice.D2.D2ClientUtils; +import com.linkedin.venice.client.store.ClientConfig; +import com.linkedin.venice.client.store.ClientFactory; import com.linkedin.venice.compression.CompressionStrategy; import com.linkedin.venice.compression.CompressorFactory; import com.linkedin.venice.compression.VeniceCompressor; @@ -12,20 +21,28 @@ import com.linkedin.venice.hadoop.ssl.UserCredentialsFactory; import com.linkedin.venice.hadoop.utils.HadoopUtils; import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig; +import com.linkedin.venice.schema.SchemaReader; +import com.linkedin.venice.security.SSLFactory; +import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; +import com.linkedin.venice.serialization.avro.KafkaValueSerializer; +import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; import com.linkedin.venice.utils.ByteUtils; import com.linkedin.venice.utils.DictionaryUtils; import com.linkedin.venice.utils.KafkaSSLUtils; +import com.linkedin.venice.utils.SslUtils; import com.linkedin.venice.utils.VeniceProperties; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Optional; import java.util.Properties; import org.apache.hadoop.mapred.JobConf; import org.apache.kafka.clients.CommonClientConfigs; public class KafkaInputUtils { + private static Properties sslProps = null; + public static VeniceProperties getConsumerProperties(JobConf config) { - Properties sslProps = null; Properties consumerFactoryProperties = new Properties(); if (config.get(SSL_CONFIGURATOR_CLASS_CONFIG) != null) { SSLConfigurator configurator = SSLConfigurator.getSSLConfigurator(config.get(SSL_CONFIGURATOR_CLASS_CONFIG)); @@ -55,6 +72,35 @@ public static VeniceProperties getConsumerProperties(JobConf config) { return new VeniceProperties(consumerFactoryProperties); } + public static KafkaValueSerializer getKafkaValueSerializer(JobConf config) { + KafkaValueSerializer kafkaValueSerializer = new OptimizedKafkaValueSerializer(); + boolean isSchemaReaderEnabled = Boolean.parseBoolean(config.get(SYSTEM_SCHEMA_READER_ENABLED, "false")); + if (isSchemaReaderEnabled) { + Optional sslFactory = Optional.empty(); + if (sslProps != null) { + String sslFactoryClassName = config.get(SSL_FACTORY_CLASS_NAME); + sslFactory = Optional.of(SslUtils.getSSLFactory(sslProps, sslFactoryClassName)); + } + String systemSchemaClusterD2ZKHost = config.get(SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST); + D2Client d2Client = new D2ClientBuilder().setZkHosts(systemSchemaClusterD2ZKHost) + .setSSLContext(sslFactory.map(SSLFactory::getSSLContext).orElse(null)) + .setIsSSLEnabled(sslFactory.isPresent()) + .setSSLParameters(sslFactory.map(SSLFactory::getSSLParameters).orElse(null)) + .build(); + D2ClientUtils.startClient(d2Client); + + String systemSchemaClusterD2ServiceName = config.get(SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME); + String kafkaMessageEnvelopeSystemStoreName = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName(); + ClientConfig kafkaMessageEnvelopeClientConfig = + ClientConfig.defaultGenericClientConfig(kafkaMessageEnvelopeSystemStoreName); + kafkaMessageEnvelopeClientConfig.setD2ServiceName(systemSchemaClusterD2ServiceName); + kafkaMessageEnvelopeClientConfig.setD2Client(d2Client); + SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader(kafkaMessageEnvelopeClientConfig); + kafkaValueSerializer.setSchemaReader(kafkaMessageEnvelopeSchemaReader); + } + return kafkaValueSerializer; + } + public static VeniceCompressor getCompressor( CompressorFactory compressorFactory, CompressionStrategy strategy, diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java index f7707c0b71..e90bb06c13 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.hadoop.VenicePushJob.REPUSH_TTL_ENABLE; import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_ETL; import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_KAFKA; +import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_READER_ENABLED; import static com.linkedin.venice.hadoop.VenicePushJob.TARGETED_REGION_PUSH_ENABLED; import static com.linkedin.venice.hadoop.VenicePushJob.TARGETED_REGION_PUSH_LIST; import static com.linkedin.venice.hadoop.VenicePushJob.VALUE_FIELD_PROP; @@ -46,6 +47,8 @@ import com.linkedin.venice.controllerapi.ControllerResponse; import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse; import com.linkedin.venice.controllerapi.JobStatusQueryResponse; +import com.linkedin.venice.controllerapi.RepushInfo; +import com.linkedin.venice.controllerapi.RepushInfoResponse; import com.linkedin.venice.controllerapi.SchemaResponse; import com.linkedin.venice.controllerapi.StoreResponse; import com.linkedin.venice.controllerapi.VersionCreationResponse; @@ -661,6 +664,20 @@ public void testTargetedRegionPushPostValidationFailedForValidation() throws Exc verify(pushJob, never()).postValidationConsumption(any()); } + @Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "D2 service name and zk host must be provided when system schema reader is enabled") + public void testEnableSchemaReaderConfigValidation() { + Properties props = getVpjRequiredProperties(); + props.put(SYSTEM_SCHEMA_READER_ENABLED, true); + VenicePushJob pushJob = spy(new VenicePushJob(PUSH_JOB_ID, props)); + doReturn("test_store_v1").when(pushJob).getSourceTopicNameForKafkaInput(anyString(), any()); + RepushInfoResponse repushInfoResponse = new RepushInfoResponse(); + RepushInfo repushInfo = new RepushInfo(); + repushInfo.setSystemSchemaClusterD2ServiceName("cluster0"); + repushInfoResponse.setRepushInfo(repushInfo); + pushJob.getPushJobSetting().repushInfoResponse = repushInfoResponse; + pushJob.initKIFRepushDetails(); + } + private JobStatusQueryResponse mockJobStatusQuery() { JobStatusQueryResponse response = new JobStatusQueryResponse(); response.setStatus(ExecutionStatus.COMPLETED.toString()); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RepushInfo.java b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RepushInfo.java index 15c1197cc0..89ce44e067 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RepushInfo.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/controllerapi/RepushInfo.java @@ -8,11 +8,19 @@ public class RepushInfo { private String kafkaBrokerUrl; private Version version; - - public static RepushInfo createRepushInfo(Version version, String kafkaBrokerUrl) { + private String systemSchemaClusterD2ServiceName; + private String systemSchemaClusterD2ZkHost; + + public static RepushInfo createRepushInfo( + Version version, + String kafkaBrokerUrl, + String systemSchemaClusterD2ServiceName, + String systemSchemaClusterD2ZkHost) { RepushInfo repushInfo = new RepushInfo(); repushInfo.setVersion(version); repushInfo.setKafkaBrokerUrl(kafkaBrokerUrl); + repushInfo.setSystemSchemaClusterD2ServiceName(systemSchemaClusterD2ServiceName); + repushInfo.setSystemSchemaClusterD2ZkHost(systemSchemaClusterD2ZkHost); return repushInfo; } @@ -24,6 +32,14 @@ public void setKafkaBrokerUrl(String kafkaBrokerUrl) { this.kafkaBrokerUrl = kafkaBrokerUrl; } + public void setSystemSchemaClusterD2ServiceName(String systemSchemaClusterD2ServiceName) { + this.systemSchemaClusterD2ServiceName = systemSchemaClusterD2ServiceName; + } + + public void setSystemSchemaClusterD2ZkHost(String systemSchemaClusterD2ZkHost) { + this.systemSchemaClusterD2ZkHost = systemSchemaClusterD2ZkHost; + } + public String getKafkaBrokerUrl() { return this.kafkaBrokerUrl; } @@ -31,4 +47,12 @@ public String getKafkaBrokerUrl() { public Version getVersion() { return version; } + + public String getSystemSchemaClusterD2ServiceName() { + return systemSchemaClusterD2ServiceName; + } + + public String getSystemSchemaClusterD2ZkHost() { + return systemSchemaClusterD2ZkHost; + } } diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java index 1052d07a2d..30bab74544 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/adapter/kafka/consumer/ApacheKafkaConsumerAdapter.java @@ -299,6 +299,9 @@ public void close() { LOGGER.warn("{} threw an exception while closing.", kafkaConsumer.getClass().getSimpleName(), e); } } + if (pubSubMessageDeserializer != null) { + pubSubMessageDeserializer.close(); + } } @Override diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java index 82dc3ffbef..4488b41f84 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializer.java @@ -101,6 +101,12 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) { } } + public void close() { + if (valueSerializer != null) { + valueSerializer.close(); + } + } + // For testing only. public KafkaValueSerializer getValueSerializer() { return valueSerializer; diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java index 90c0f05d22..f01cc18c40 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/serialization/avro/InternalAvroSpecificSerializer.java @@ -179,7 +179,13 @@ protected InternalAvroSpecificSerializer( */ @Override public void close() { - + if (schemaReader != null) { + try { + schemaReader.close(); + } catch (IOException e) { + LOGGER.warn("Caught IOException while closing the schema reader", e); + } + } } public IntSet knownProtocols() { @@ -450,9 +456,4 @@ private String getCurrentlyLoadedProtocolVersions() { public void removeAllSchemas() { this.protocolVersionToReader.clear(); } - - // For testing purpose only. - public int getProtocolVersionSize() { - return protocolVersionToReader.size(); - } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializerTest.java b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializerTest.java index f7fedca1fd..7cf8951c48 100644 --- a/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializerTest.java +++ b/internal/venice-common/src/test/java/com/linkedin/venice/pubsub/api/PubSubMessageDeserializerTest.java @@ -17,6 +17,7 @@ import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer; import com.linkedin.venice.utils.pools.LandFillObjectPool; import java.nio.ByteBuffer; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -43,6 +44,11 @@ public void setUp() { valueSerializer = new OptimizedKafkaValueSerializer(); } + @AfterMethod + public void cleanUp() { + messageDeserializer.close(); + } + @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Illegal key header byte.*") public void testDeserializerFailsWhenKeyValueFormatIsInvalid() { messageDeserializer diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java index adf096b671..959ebda325 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTest.java @@ -16,7 +16,6 @@ import com.linkedin.venice.partitioner.VenicePartitioner; import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapter; import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig; -import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; import com.linkedin.venice.serialization.DefaultSerializer; import com.linkedin.venice.serialization.VeniceKafkaSerializer; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; @@ -96,12 +95,12 @@ public abstract class ConsumerIntegrationTest { NEW_PROTOCOL_SCHEMA.setFields(protocolSchemaFields); } - private VeniceClusterWrapper cluster; + VeniceClusterWrapper cluster; + String store; + int version; + AvroGenericStoreClient client; private ControllerClient controllerClient; - private String store; - private int version; private String topicName; - private AvroGenericStoreClient client; @BeforeClass public void sharedSetUp() { @@ -158,6 +157,13 @@ public void testForwardCompatibility() throws ExecutionException, InterruptedExc writeAndVerifyRecord(regularVeniceWriter, client, "value1"); } + try (VeniceWriter veniceWriterWithNewerProtocol = + getVeniceWriterWithNewerProtocol(getOverrideProtocolSchema(), topicName)) { + writeAndVerifyRecord(veniceWriterWithNewerProtocol, client, "value2"); + } + } + + VeniceWriterWithNewerProtocol getVeniceWriterWithNewerProtocol(Schema overrideProtocolSchema, String topicName) { Properties javaProps = new Properties(); javaProps .put(ApacheKafkaProducerConfig.KAFKA_VALUE_SERIALIZER, KafkaValueSerializerWithNewerProtocol.class.getName()); @@ -175,21 +181,14 @@ public void testForwardCompatibility() throws ExecutionException, InterruptedExc .setTime(time) .setPartitioner(partitioner) .build(); - - try (VeniceWriter veniceWriterWithNewerProtocol = getVeniceWriter( + return new VeniceWriterWithNewerProtocol( veniceWriterOptions, props, new ApacheKafkaProducerWithNewerProtocolAdapter(props), - NEW_PROTOCOL_SCHEMA)) { - writeAndVerifyRecord(veniceWriterWithNewerProtocol, client, "value2"); - } + overrideProtocolSchema); } - abstract VeniceWriterWithNewerProtocol getVeniceWriter( - VeniceWriterOptions veniceWriterOptions, - VeniceProperties props, - PubSubProducerAdapter producerAdapter, - Schema overrideProtocolSchema); + abstract Schema getOverrideProtocolSchema(); private void writeAndVerifyRecord( VeniceWriter veniceWriter, diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithProtocolHeader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithProtocolHeader.java index 3da57f392f..3f3dd91df4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithProtocolHeader.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithProtocolHeader.java @@ -1,18 +1,11 @@ package com.linkedin.venice.consumer; -import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; -import com.linkedin.venice.utils.VeniceProperties; -import com.linkedin.venice.writer.VeniceWriterOptions; import org.apache.avro.Schema; public class ConsumerIntegrationTestWithProtocolHeader extends ConsumerIntegrationTest { @Override - VeniceWriterWithNewerProtocol getVeniceWriter( - VeniceWriterOptions veniceWriterOptions, - VeniceProperties props, - PubSubProducerAdapter producerAdapter, - Schema overrideProtocolSchema) { - return new VeniceWriterWithNewerProtocol(veniceWriterOptions, props, producerAdapter, NEW_PROTOCOL_SCHEMA); + Schema getOverrideProtocolSchema() { + return NEW_PROTOCOL_SCHEMA; } } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java index 46773eb9a3..5e13a277cc 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/consumer/ConsumerIntegrationTestWithSchemaReader.java @@ -1,17 +1,26 @@ package com.linkedin.venice.consumer; +import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_FABRIC; +import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_MAX_RECORDS_PER_MAPPER; +import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_KAFKA; +import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_READER_ENABLED; +import static com.linkedin.venice.utils.IntegrationTestPushUtils.defaultVPJProps; + import com.linkedin.venice.controllerapi.ControllerClient; import com.linkedin.venice.controllerapi.MultiSchemaResponse; import com.linkedin.venice.integration.utils.VeniceClusterWrapper; -import com.linkedin.venice.pubsub.api.PubSubProducerAdapter; +import com.linkedin.venice.meta.Version; import com.linkedin.venice.schema.avro.DirectionalSchemaCompatibilityType; import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.VeniceProperties; -import com.linkedin.venice.writer.VeniceWriterOptions; +import com.linkedin.venice.utils.TestWriteUtils; +import com.linkedin.venice.writer.VeniceWriter; +import java.util.Properties; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; import org.testng.Assert; +import org.testng.annotations.Test; public class ConsumerIntegrationTestWithSchemaReader extends ConsumerIntegrationTest { @@ -42,11 +51,35 @@ void extraBeforeClassSetUp(VeniceClusterWrapper cluster, ControllerClient contro } @Override - VeniceWriterWithNewerProtocol getVeniceWriter( - VeniceWriterOptions veniceWriterOptions, - VeniceProperties props, - PubSubProducerAdapter producerAdapter, - Schema overrideProtocolSchema) { - return new VeniceWriterWithNewerProtocol(veniceWriterOptions, props, producerAdapter, null); + Schema getOverrideProtocolSchema() { + return null; + } + + @Test + public void testKIFRepushForwardCompatibility() throws Exception { + // Write a new record to VT with new KME protocol + String versionTopic = Version.composeKafkaTopic(store, version); + try (VeniceWriter veniceWriterWithNewerProtocol = + getVeniceWriterWithNewerProtocol(getOverrideProtocolSchema(), versionTopic)) { + veniceWriterWithNewerProtocol.put("test_key", "test_value", 1).get(); + } + + // Run the repush job, which will fetch the new KME schema via schema reader to deserialize the new record + Properties vpjProps = defaultVPJProps(cluster, "Ignored", store); + vpjProps.setProperty(SOURCE_KAFKA, "true"); + vpjProps.setProperty(KAFKA_INPUT_FABRIC, "dc-0"); + vpjProps.setProperty(KAFKA_INPUT_MAX_RECORDS_PER_MAPPER, "5"); + vpjProps.setProperty(SYSTEM_SCHEMA_READER_ENABLED, "true"); + TestWriteUtils.runPushJob("Test push job", vpjProps); + + TestUtils.waitForNonDeterministicAssertion(30, TimeUnit.SECONDS, () -> { + try { + Object value = client.get("test_key").get(); + Assert.assertNotNull(value, "The key written with the new protocol is not in the store yet."); + Assert.assertEquals(value.toString(), "test_value", "The key written with new protocol is not valid."); + } catch (ExecutionException e) { + Assert.fail("Caught exception: " + e.getMessage()); + } + }); } } diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java index a4b22b48fd..4262e6e5fd 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java @@ -3031,14 +3031,21 @@ public Map getCurrentVersionsForMultiColos(String clusterName, } /** - * @return a new {@linkplain RepushInfo} object with specified store info. + * @return a new {@linkplain RepushInfo} object with specified store info. */ @Override public RepushInfo getRepushInfo(String clusterName, String storeName, Optional fabricName) { Store store = getStore(clusterName, storeName); boolean isSSL = isSSLEnabledForPush(clusterName, storeName); - return RepushInfo - .createRepushInfo(store.getVersion(store.getCurrentVersion()).get(), getKafkaBootstrapServers(isSSL)); + String systemSchemaClusterName = multiClusterConfigs.getSystemSchemaClusterName(); + VeniceControllerConfig systemSchemaClusterConfig = multiClusterConfigs.getControllerConfig(systemSchemaClusterName); + String systemSchemaClusterD2Service = systemSchemaClusterConfig.getClusterToD2Map().get(systemSchemaClusterName); + String systemSchemaClusterD2ZkHost = systemSchemaClusterConfig.getChildControllerD2ZkHost(getRegionName()); + return RepushInfo.createRepushInfo( + store.getVersion(store.getCurrentVersion()).get(), + getKafkaBootstrapServers(isSSL), + systemSchemaClusterD2Service, + systemSchemaClusterD2ZkHost); } /** diff --git a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java index 18c526ddb5..926ffa3a25 100644 --- a/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java +++ b/services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java @@ -1676,6 +1676,8 @@ public Map getCurrentVersionsForMultiColos(String clusterName, @Override public RepushInfo getRepushInfo(String clusterName, String storeName, Optional fabricName) { Map controllerClients = getVeniceHelixAdmin().getControllerClientMap(clusterName); + String systemSchemaClusterName = multiClusterConfigs.getSystemSchemaClusterName(); + VeniceControllerConfig systemSchemaClusterConfig = multiClusterConfigs.getControllerConfig(systemSchemaClusterName); if (fabricName.isPresent()) { StoreResponse response = controllerClients.get(fabricName.get()).getStore(storeName); @@ -1686,7 +1688,9 @@ public RepushInfo getRepushInfo(String clusterName, String storeName, Optional currentVersionsMap = @@ -1707,7 +1711,9 @@ public RepushInfo getRepushInfo(String clusterName, String storeName, Optional