Skip to content

Commit

Permalink
[server][test] Message header triggered new KME schema registration i…
Browse files Browse the repository at this point in the history
…n Venice server (#669)

* [server][test] Message header triggered new KME schema registration in Venice server

This is a follow-up change to 0b5af23 and it is also part of the solution to remove deployment
order for KafkaMessageEnvelope (KME) value schema. For Venice servers, by design, we apply the
same strategy as controllers, i.e., when servers find an unknown KME schema from the message header,
they need to request the system cluster leader controller to register the new schema into
local region system store.

This change enables Venice servers to perform the unknown KME schemas registration work when it
discovers. At its core, it leverages the existing functionalities from the
ControllerClientBackedSystemSchemaInitializer class for the implementation.
A new server config is introduced and a new integration test is added to verify its correctness.
  • Loading branch information
lluwm authored Sep 29, 2023
1 parent 867a4d1 commit a8af3a9
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public DaVinciBackend(
true,
// TODO: consider how/if a repair task would be valid for Davinci users?
null,
pubSubClientsFactory);
pubSubClientsFactory,
Optional.empty());

ingestionService.start();
ingestionService.addIngestionNotifier(ingestionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.linkedin.venice.ConfigKeys.KAFKA_READ_ONLY_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KAFKA_WRITE_ONLY_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KEY_VALUE_PROFILING_ENABLED;
import static com.linkedin.venice.ConfigKeys.KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED;
import static com.linkedin.venice.ConfigKeys.LEADER_FOLLOWER_STATE_TRANSITION_THREAD_POOL_STRATEGY;
import static com.linkedin.venice.ConfigKeys.LISTENER_HOSTNAME;
import static com.linkedin.venice.ConfigKeys.LISTENER_PORT;
Expand Down Expand Up @@ -396,6 +397,7 @@ public class VeniceServerConfig extends VeniceClusterConfig {

private final boolean schemaPresenceCheckEnabled;
private final boolean systemSchemaInitializationAtStartTimeEnabled;
private final boolean isKMERegistrationFromMessageHeaderEnabled;
private final String localControllerUrl;
private final String localControllerD2ServiceName;
private final String localD2ZkHost;
Expand Down Expand Up @@ -645,6 +647,8 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
schemaPresenceCheckEnabled = serverProperties.getBoolean(SERVER_SCHEMA_PRESENCE_CHECK_ENABLED, true);
systemSchemaInitializationAtStartTimeEnabled =
serverProperties.getBoolean(SYSTEM_SCHEMA_INITIALIZATION_AT_START_TIME_ENABLED, false);
isKMERegistrationFromMessageHeaderEnabled =
serverProperties.getBoolean(KME_REGISTRATION_FROM_MESSAGE_HEADER_ENABLED, false);
localControllerUrl = serverProperties.getString(LOCAL_CONTROLLER_URL, "");
localControllerD2ServiceName = serverProperties.getString(LOCAL_CONTROLLER_D2_SERVICE_NAME, "");
localD2ZkHost = serverProperties.getString(LOCAL_D2_ZK_HOST, "");
Expand Down Expand Up @@ -1252,4 +1256,8 @@ public String getRouterPrincipalName() {
public int getIngestionTaskMaxIdleCount() {
return ingestionTaskMaxIdleCount;
}

public boolean isKMERegistrationFromMessageHeaderEnabled() {
return isKMERegistrationFromMessageHeaderEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,8 @@ private void initializeIsolatedIngestionServer() {
Optional.empty(),
isDaVinciClient,
repairService,
pubSubClientsFactory);
pubSubClientsFactory,
sslFactory);
storeIngestionService.start();
storeIngestionService.addIngestionNotifier(new IsolatedIngestionNotifier(this));
ingestionBackend = new DefaultIngestionBackend(storageMetadataService, storeIngestionService, storageService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;

import com.google.common.collect.ImmutableMap;
import com.linkedin.davinci.compression.StorageEngineBackedCompressorFactory;
import com.linkedin.davinci.config.VeniceConfigLoader;
import com.linkedin.davinci.config.VeniceServerConfig;
Expand Down Expand Up @@ -74,11 +75,14 @@
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.security.SSLFactory;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.InternalAvroSpecificSerializer;
import com.linkedin.venice.serialization.avro.KafkaValueSerializer;
import com.linkedin.venice.serialization.avro.OptimizedKafkaValueSerializer;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.system.store.ControllerClientBackedSystemSchemaInitializer;
import com.linkedin.venice.system.store.MetaStoreWriter;
import com.linkedin.venice.throttle.EventThrottler;
import com.linkedin.venice.utils.ComplementSet;
Expand Down Expand Up @@ -119,8 +123,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import org.apache.avro.Schema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -205,6 +211,9 @@ public class KafkaStoreIngestionService extends AbstractVeniceService implements
private final ResourceAutoClosableLockManager<String> topicLockManager;

private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private Optional<SSLFactory> sslFactory;

private KafkaValueSerializer kafkaValueSerializer;

public KafkaStoreIngestionService(
StorageEngineRepository storageEngineRepository,
Expand All @@ -227,7 +236,8 @@ public KafkaStoreIngestionService(
Optional<ObjectCacheBackend> cacheBackend,
boolean isDaVinciClient,
RemoteIngestionRepairService remoteIngestionRepairService,
PubSubClientsFactory pubSubClientsFactory) {
PubSubClientsFactory pubSubClientsFactory,
Optional<SSLFactory> sslFactory) {
this.cacheBackend = cacheBackend;
this.storageMetadataService = storageMetadataService;
this.metadataRepo = metadataRepo;
Expand All @@ -239,6 +249,7 @@ public KafkaStoreIngestionService(
this.compressorFactory = compressorFactory;
// Each topic that has any partition ingested by this class has its own lock.
this.topicLockManager = new ResourceAutoClosableLockManager<>(ReentrantLock::new);
this.sslFactory = sslFactory;

customizedViewFuture.ifPresent(future -> future.thenApply(cv -> this.customizedViewRepository = cv));
helixInstanceFuture.ifPresent(future -> future.thenApply(helix -> this.helixInstanceConfigRepository = helix));
Expand Down Expand Up @@ -399,8 +410,40 @@ public void handleStoreDeleted(Store store) {
+ "may not be killed if admin helix messaging channel is disabled");
}

// TODO: Wire configs into these params
KafkaValueSerializer kafkaValueSerializer = new OptimizedKafkaValueSerializer();
/**
* Register a callback function to handle the case when a new KME value schema is encountered when the server
* consumes messages from Kafka.
*/
BiConsumer<Integer, Schema> newSchemaEncountered = (schemaId, schema) -> {
LOGGER.info("Encountered a new KME value schema (id = {}), proceed to register", schemaId);
try (ControllerClientBackedSystemSchemaInitializer schemaInitializer =
new ControllerClientBackedSystemSchemaInitializer(
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE,
serverConfig.getSystemSchemaClusterName(),
null,
null,
false,
sslFactory,
serverConfig.getLocalControllerUrl(),
serverConfig.getLocalControllerD2ServiceName(),
serverConfig.getLocalD2ZkHost(),
false)) {
schemaInitializer.execute(ImmutableMap.of(schemaId, schema));
} catch (VeniceException e) {
LOGGER.error(
"Exception in registering '{}' schema version '{}'",
AvroProtocolDefinition.KAFKA_MESSAGE_ENVELOPE.name(),
schemaId,
e);
throw e;
}
};

// Don't apply newSchemaEncountered callbacks for da vinci client.
kafkaValueSerializer = (!isDaVinciClient && serverConfig.isKMERegistrationFromMessageHeaderEnabled())
? new OptimizedKafkaValueSerializer(newSchemaEncountered)
: new OptimizedKafkaValueSerializer();

kafkaMessageEnvelopeSchemaReader.ifPresent(kafkaValueSerializer::setSchemaReader);
PubSubMessageDeserializer pubSubDeserializer = new PubSubMessageDeserializer(
kafkaValueSerializer,
Expand Down Expand Up @@ -1358,4 +1401,9 @@ private void resetConsumptionOffset(VeniceStoreVersionConfig veniceStore, int pa
}
LOGGER.info("Offset reset to beginning - Kafka Partition: {}-{}.", topic, partitionId);
}

// For testing purpose only.
public KafkaValueSerializer getKafkaValueSerializer() {
return kafkaValueSerializer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public void testDisableMetricsEmission() {
Optional.empty(),
false,
null,
mockPubSubClientsFactory);
mockPubSubClientsFactory,
Optional.empty());

String mockStoreName = "test";
String mockSimilarStoreName = "testTest";
Expand Down Expand Up @@ -250,7 +251,8 @@ public void testGetIngestingTopicsNotWithOnlineVersion() {
Optional.empty(),
false,
null,
mockPubSubClientsFactory);
mockPubSubClientsFactory,
Optional.empty());
String topic1 = "test-store_v1";
String topic2 = "test-store_v2";
String invalidTopic = "invalid-store_v1";
Expand Down Expand Up @@ -337,7 +339,8 @@ public void testCloseStoreIngestionTask() {
Optional.empty(),
false,
null,
mockPubSubClientsFactory);
mockPubSubClientsFactory,
Optional.empty());
String topicName = "test-store_v1";
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
Store mockStore = new ZKStore(
Expand Down Expand Up @@ -401,7 +404,8 @@ public void testStoreIngestionTaskShutdownLastPartition(boolean isIsolatedIngest
Optional.empty(),
false,
null,
mockPubSubClientsFactory);
mockPubSubClientsFactory,
Optional.empty());
String topicName = "test-store_v1";
String storeName = Version.parseStoreFromKafkaTopicName(topicName);
Store mockStore = new ZKStore(
Expand Down Expand Up @@ -463,7 +467,8 @@ public void testGetMetadata() {
Optional.empty(),
false,
null,
mockPubSubClientsFactory);
mockPubSubClientsFactory,
Optional.empty());
String storeName = "test-store";
String otherStoreName = "test-store2";
Store mockStore = new ZKStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,11 @@ public SPECIFIC_RECORD deserialize(byte[] bytes, Schema providedProtocolSchema,
VeniceSpecificDatumReader<SPECIFIC_RECORD> specificDatumReader =
protocolVersionToReader.computeIfAbsent(protocolVersion, index -> {
newSchemaEncountered.accept(protocolVersion, providedProtocolSchema);

// Add a new datum reader for the protocol version. Notice that in that case that newSchemaEncountered is
// an empty lambda, the reader will still be added locally.
return cacheDatumReader(protocolVersion, providedProtocolSchema);
});

return deserialize(bytes, specificDatumReader, reuse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.utils.RetryUtils;
import com.linkedin.venice.utils.Utils;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -32,7 +33,7 @@
import org.apache.logging.log4j.Logger;


public class ControllerClientBackedSystemSchemaInitializer {
public class ControllerClientBackedSystemSchemaInitializer implements Closeable {
private static final Logger LOGGER = LogManager.getLogger(ControllerClientBackedSystemSchemaInitializer.class);
private static final String DEFAULT_KEY_SCHEMA_STR = "\"int\"";
/**
Expand Down Expand Up @@ -347,6 +348,13 @@ private void checkAndMayRegisterPartialUpdateSchema(
}
}

@Override
public void close() {
if (controllerClient != null) {
controllerClient.close();
}
}

// For testing
void setControllerClient(ControllerClient controllerClient) {
this.controllerClient = controllerClient;
Expand Down
Loading

0 comments on commit a8af3a9

Please sign in to comment.