Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vpj][controller][common] Add KME schema reader into repush #623

Merged
merged 2 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.linkedin.venice.controllerapi.D2ControllerClientFactory;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.RepushInfo;
import com.linkedin.venice.controllerapi.RepushInfoResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
Expand Down Expand Up @@ -168,6 +169,9 @@ public class VenicePushJob implements AutoCloseable {
public static final String SEND_CONTROL_MESSAGES_DIRECTLY = "send.control.messages.directly";
public static final String SOURCE_ETL = "source.etl";
public static final String ETL_VALUE_SCHEMA_TRANSFORMATION = "etl.value.schema.transformation";
public static final String SYSTEM_SCHEMA_READER_ENABLED = "system.schema.reader.enabled";
public static final String SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME = "system.schema.cluster.d2.service.name";
public static final String SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST = "system.schema.cluster.d2.zk.host";

/**
* Config to enable/disable the feature to collect extra metrics wrt compression.
Expand Down Expand Up @@ -539,6 +543,9 @@ protected static class PushJobSetting {
String targetedRegions;
boolean isTargetedRegionPushEnabled;
boolean postValidationConsumption;
boolean isSystemSchemaReaderEnabled;
String systemSchemaClusterD2ServiceName;
String systemSchemaClusterD2ZKHost;
}

protected PushJobSetting pushJobSetting;
Expand Down Expand Up @@ -715,6 +722,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
pushJobSettingToReturn.repushTTLStartTimeMs = props.getLong(REPUSH_TTL_START_TIMESTAMP, System.currentTimeMillis());
pushJobSettingToReturn.isTargetedRegionPushEnabled = props.getBoolean(TARGETED_REGION_PUSH_ENABLED, false);
pushJobSettingToReturn.postValidationConsumption = props.getBoolean(POST_VALIDATION_CONSUMPTION_ENABLED, true);
pushJobSettingToReturn.isSystemSchemaReaderEnabled = props.getBoolean(SYSTEM_SCHEMA_READER_ENABLED, false);
shuhui-liu marked this conversation as resolved.
Show resolved Hide resolved
if (pushJobSettingToReturn.isIncrementalPush && pushJobSettingToReturn.isTargetedRegionPushEnabled) {
throw new VeniceException("Incremental push is not supported while using targeted region push mode");
}
Expand Down Expand Up @@ -834,9 +842,7 @@ private PushJobSetting getPushJobSetting(VeniceProperties props) {
* @param properties properties
* @return Topic name
*/
private String getSourceTopicNameForKafkaInput(
final String userProvidedStoreName,
final VeniceProperties properties) {
String getSourceTopicNameForKafkaInput(final String userProvidedStoreName, final VeniceProperties properties) {
final Optional<String> userProvidedTopicNameOptional =
Optional.ofNullable(properties.getString(KAFKA_INPUT_TOPIC, () -> null));

Expand Down Expand Up @@ -1813,9 +1819,19 @@ private void initControllerClient(String storeName, Optional<SSLFactory> sslFact

protected void initKIFRepushDetails() {
pushJobSetting.kafkaInputTopic = getSourceTopicNameForKafkaInput(pushJobSetting.storeName, props);
pushJobSetting.kafkaInputBrokerUrl = pushJobSetting.repushInfoResponse == null
? props.getString(KAFKA_INPUT_BROKER_URL)
: pushJobSetting.repushInfoResponse.getRepushInfo().getKafkaBrokerUrl();
if (pushJobSetting.repushInfoResponse == null) {
pushJobSetting.kafkaInputBrokerUrl = props.getString(KAFKA_INPUT_BROKER_URL);
} else {
RepushInfo repushInfo = pushJobSetting.repushInfoResponse.getRepushInfo();
pushJobSetting.kafkaInputBrokerUrl = repushInfo.getKafkaBrokerUrl();
pushJobSetting.systemSchemaClusterD2ServiceName = repushInfo.getSystemSchemaClusterD2ServiceName();
pushJobSetting.systemSchemaClusterD2ZKHost = repushInfo.getSystemSchemaClusterD2ZkHost();
}
if (pushJobSetting.isSystemSchemaReaderEnabled
&& (StringUtils.isEmpty(pushJobSetting.systemSchemaClusterD2ServiceName)
|| StringUtils.isEmpty(pushJobSetting.systemSchemaClusterD2ZKHost))) {
throw new VeniceException("D2 service name and zk host must be provided when system schema reader is enabled");
}
}

private ControllerClient getControllerClient(
Expand Down Expand Up @@ -3015,6 +3031,12 @@ protected void setupDefaultJobConf(
KAFKA_INPUT_SOURCE_TOPIC_CHUNKING_ENABLED,
Boolean.toString(storeSetting.sourceKafkaInputVersionInfo.isChunkingEnabled()));

conf.setBoolean(SYSTEM_SCHEMA_READER_ENABLED, pushJobSetting.isSystemSchemaReaderEnabled);
if (pushJobSetting.isSystemSchemaReaderEnabled) {
conf.set(SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME, pushJobSetting.systemSchemaClusterD2ServiceName);
conf.set(SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST, pushJobSetting.systemSchemaClusterD2ZKHost);
conf.set(SSL_FACTORY_CLASS_NAME, props.getString(SSL_FACTORY_CLASS_NAME, DEFAULT_SSL_FACTORY_CLASS_NAME));
}
} else {
conf.setInt(VALUE_SCHEMA_ID_PROP, pushJobSchemaInfo.getValueSchemaId());
conf.setInt(DERIVED_SCHEMA_ID_PROP, pushJobSchemaInfo.getDerivedSchemaId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
Expand Down Expand Up @@ -208,7 +207,7 @@ synchronized byte[] trainDict(Optional<PubSubConsumerAdapter> reusedConsumerOpti
KafkaInputUtils.getConsumerProperties(jobConf),
false,
new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
KafkaInputUtils.getKafkaValueSerializer(jobConf),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new)),
null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.io.IOException;
Expand Down Expand Up @@ -100,7 +99,7 @@ public KafkaInputRecordReader(InputSplit split, JobConf job, Reporter reporter)
KafkaInputUtils.getConsumerProperties(job),
false,
new PubSubMessageDeserializer(
new OptimizedKafkaValueSerializer(),
KafkaInputUtils.getKafkaValueSerializer(job),
new LandFillObjectPool<>(KafkaMessageEnvelope::new),
new LandFillObjectPool<>(KafkaMessageEnvelope::new)),
null),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package com.linkedin.venice.hadoop.input.kafka;

import static com.linkedin.venice.CommonConfigKeys.SSL_FACTORY_CLASS_NAME;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.hadoop.VenicePushJob.KAFKA_INPUT_BROKER_URL;
import static com.linkedin.venice.hadoop.VenicePushJob.SSL_CONFIGURATOR_CLASS_CONFIG;
import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME;
import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST;
import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_READER_ENABLED;

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.venice.D2.D2ClientUtils;
import com.linkedin.venice.client.store.ClientConfig;
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.compression.CompressionStrategy;
import com.linkedin.venice.compression.CompressorFactory;
import com.linkedin.venice.compression.VeniceCompressor;
Expand All @@ -12,20 +21,28 @@
import com.linkedin.venice.hadoop.ssl.UserCredentialsFactory;
import com.linkedin.venice.hadoop.utils.HadoopUtils;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.DictionaryUtils;
import com.linkedin.venice.utils.KafkaSSLUtils;
import com.linkedin.venice.utils.SslUtils;
import com.linkedin.venice.utils.VeniceProperties;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.mapred.JobConf;
import org.apache.kafka.clients.CommonClientConfigs;


public class KafkaInputUtils {
private static Properties sslProps = null;

public static VeniceProperties getConsumerProperties(JobConf config) {
Properties sslProps = null;
Properties consumerFactoryProperties = new Properties();
if (config.get(SSL_CONFIGURATOR_CLASS_CONFIG) != null) {
SSLConfigurator configurator = SSLConfigurator.getSSLConfigurator(config.get(SSL_CONFIGURATOR_CLASS_CONFIG));
Expand Down Expand Up @@ -55,6 +72,35 @@ public static VeniceProperties getConsumerProperties(JobConf config) {
return new VeniceProperties(consumerFactoryProperties);
}

public static KafkaValueSerializer getKafkaValueSerializer(JobConf config) {
KafkaValueSerializer kafkaValueSerializer = new OptimizedKafkaValueSerializer();
boolean isSchemaReaderEnabled = Boolean.parseBoolean(config.get(SYSTEM_SCHEMA_READER_ENABLED, "false"));
if (isSchemaReaderEnabled) {
shuhui-liu marked this conversation as resolved.
Show resolved Hide resolved
Optional<SSLFactory> sslFactory = Optional.empty();
if (sslProps != null) {
String sslFactoryClassName = config.get(SSL_FACTORY_CLASS_NAME);
sslFactory = Optional.of(SslUtils.getSSLFactory(sslProps, sslFactoryClassName));
}
String systemSchemaClusterD2ZKHost = config.get(SYSTEM_SCHEMA_CLUSTER_D2_ZK_HOST);
D2Client d2Client = new D2ClientBuilder().setZkHosts(systemSchemaClusterD2ZKHost)
.setSSLContext(sslFactory.map(SSLFactory::getSSLContext).orElse(null))
.setIsSSLEnabled(sslFactory.isPresent())
.setSSLParameters(sslFactory.map(SSLFactory::getSSLParameters).orElse(null))
.build();
D2ClientUtils.startClient(d2Client);

String systemSchemaClusterD2ServiceName = config.get(SYSTEM_SCHEMA_CLUSTER_D2_SERVICE_NAME);
String kafkaMessageEnvelopeSystemStoreName = AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.getSystemStoreName();
ClientConfig kafkaMessageEnvelopeClientConfig =
ClientConfig.defaultGenericClientConfig(kafkaMessageEnvelopeSystemStoreName);
kafkaMessageEnvelopeClientConfig.setD2ServiceName(systemSchemaClusterD2ServiceName);
kafkaMessageEnvelopeClientConfig.setD2Client(d2Client);
SchemaReader kafkaMessageEnvelopeSchemaReader = ClientFactory.getSchemaReader(kafkaMessageEnvelopeClientConfig);
kafkaValueSerializer.setSchemaReader(kafkaMessageEnvelopeSchemaReader);
}
return kafkaValueSerializer;
}

public static VeniceCompressor getCompressor(
CompressorFactory compressorFactory,
CompressionStrategy strategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.hadoop.VenicePushJob.REPUSH_TTL_ENABLE;
import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_ETL;
import static com.linkedin.venice.hadoop.VenicePushJob.SOURCE_KAFKA;
import static com.linkedin.venice.hadoop.VenicePushJob.SYSTEM_SCHEMA_READER_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJob.TARGETED_REGION_PUSH_ENABLED;
import static com.linkedin.venice.hadoop.VenicePushJob.TARGETED_REGION_PUSH_LIST;
import static com.linkedin.venice.hadoop.VenicePushJob.VALUE_FIELD_PROP;
Expand Down Expand Up @@ -46,6 +47,8 @@
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.controllerapi.D2ServiceDiscoveryResponse;
import com.linkedin.venice.controllerapi.JobStatusQueryResponse;
import com.linkedin.venice.controllerapi.RepushInfo;
import com.linkedin.venice.controllerapi.RepushInfoResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
Expand Down Expand Up @@ -661,6 +664,20 @@ public void testTargetedRegionPushPostValidationFailedForValidation() throws Exc
verify(pushJob, never()).postValidationConsumption(any());
}

@Test(expectedExceptions = VeniceException.class, expectedExceptionsMessageRegExp = "D2 service name and zk host must be provided when system schema reader is enabled")
public void testEnableSchemaReaderConfigValidation() {
Properties props = getVpjRequiredProperties();
props.put(SYSTEM_SCHEMA_READER_ENABLED, true);
VenicePushJob pushJob = spy(new VenicePushJob(PUSH_JOB_ID, props));
doReturn("test_store_v1").when(pushJob).getSourceTopicNameForKafkaInput(anyString(), any());
RepushInfoResponse repushInfoResponse = new RepushInfoResponse();
RepushInfo repushInfo = new RepushInfo();
repushInfo.setSystemSchemaClusterD2ServiceName("cluster0");
repushInfoResponse.setRepushInfo(repushInfo);
pushJob.getPushJobSetting().repushInfoResponse = repushInfoResponse;
pushJob.initKIFRepushDetails();
}

private JobStatusQueryResponse mockJobStatusQuery() {
JobStatusQueryResponse response = new JobStatusQueryResponse();
response.setStatus(ExecutionStatus.COMPLETED.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,19 @@
public class RepushInfo {
private String kafkaBrokerUrl;
private Version version;

public static RepushInfo createRepushInfo(Version version, String kafkaBrokerUrl) {
private String systemSchemaClusterD2ServiceName;
private String systemSchemaClusterD2ZkHost;

public static RepushInfo createRepushInfo(
Version version,
String kafkaBrokerUrl,
String systemSchemaClusterD2ServiceName,
String systemSchemaClusterD2ZkHost) {
RepushInfo repushInfo = new RepushInfo();
repushInfo.setVersion(version);
repushInfo.setKafkaBrokerUrl(kafkaBrokerUrl);
repushInfo.setSystemSchemaClusterD2ServiceName(systemSchemaClusterD2ServiceName);
repushInfo.setSystemSchemaClusterD2ZkHost(systemSchemaClusterD2ZkHost);
return repushInfo;
}

Expand All @@ -24,11 +32,27 @@ public void setKafkaBrokerUrl(String kafkaBrokerUrl) {
this.kafkaBrokerUrl = kafkaBrokerUrl;
}

public void setSystemSchemaClusterD2ServiceName(String systemSchemaClusterD2ServiceName) {
this.systemSchemaClusterD2ServiceName = systemSchemaClusterD2ServiceName;
}

public void setSystemSchemaClusterD2ZkHost(String systemSchemaClusterD2ZkHost) {
this.systemSchemaClusterD2ZkHost = systemSchemaClusterD2ZkHost;
}

public String getKafkaBrokerUrl() {
return this.kafkaBrokerUrl;
}

public Version getVersion() {
return version;
}

public String getSystemSchemaClusterD2ServiceName() {
return systemSchemaClusterD2ServiceName;
}

public String getSystemSchemaClusterD2ZkHost() {
return systemSchemaClusterD2ZkHost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,9 @@ public void close() {
LOGGER.warn("{} threw an exception while closing.", kafkaConsumer.getClass().getSimpleName(), e);
}
}
if (pubSubMessageDeserializer != null) {
pubSubMessageDeserializer.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ private KafkaMessageEnvelope getEnvelope(byte keyHeaderByte) {
}
}

public void close() {
if (valueSerializer != null) {
valueSerializer.close();
}
}

// For testing only.
public KafkaValueSerializer getValueSerializer() {
return valueSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ protected InternalAvroSpecificSerializer(
*/
@Override
public void close() {

if (schemaReader != null) {
try {
schemaReader.close();
} catch (IOException e) {
LOGGER.warn("Caught IOException while closing the schema reader", e);
}
}
}

public IntSet knownProtocols() {
Expand Down Expand Up @@ -450,9 +456,4 @@ private String getCurrentlyLoadedProtocolVersions() {
public void removeAllSchemas() {
this.protocolVersionToReader.clear();
}

// For testing purpose only.
public int getProtocolVersionSize() {
return protocolVersionToReader.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.utils.pools.LandFillObjectPool;
import java.nio.ByteBuffer;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

Expand All @@ -43,6 +44,11 @@ public void setUp() {
valueSerializer = new OptimizedKafkaValueSerializer();
}

@AfterMethod
public void cleanUp() {
messageDeserializer.close();
}

@Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Illegal key header byte.*")
public void testDeserializerFailsWhenKeyValueFormatIsInvalid() {
messageDeserializer
Expand Down
Loading