Skip to content

Commit

Permalink
[server][da-vinci] Switch to dedicated producers from shared producer…
Browse files Browse the repository at this point in the history
… in test cases (linkedin#923)

This commit switches test cases from using a shared producer, which is not
supported in production, to dedicated producers. The shared producer code
has been deleted as it is currently not useful.
  • Loading branch information
sushantmane authored Apr 4, 2024
1 parent b7f874e commit 2ab82a9
Show file tree
Hide file tree
Showing 17 changed files with 1 addition and 1,076 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import static com.linkedin.venice.ConfigKeys.SERVER_INGESTION_TASK_MAX_IDLE_COUNT;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_CONSUMER_OFFSET_COLLECTION_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_MAX_POLL_RECORDS;
import static com.linkedin.venice.ConfigKeys.SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED;
Expand Down Expand Up @@ -112,7 +111,6 @@
import static com.linkedin.venice.ConfigKeys.SERVER_SCHEMA_PRESENCE_CHECK_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_ASSIGNMENT_STRATEGY;
import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_CONSUMER_NON_EXISTING_TOPIC_CLEANUP_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_SHARED_KAFKA_PRODUCER_ENABLED;
import static com.linkedin.venice.ConfigKeys.SERVER_SHUTDOWN_DISK_UNHEALTHY_TIME_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_SOURCE_TOPIC_OFFSET_CHECK_INTERVAL_MS;
import static com.linkedin.venice.ConfigKeys.SERVER_SSL_HANDSHAKE_QUEUE_CAPACITY;
Expand Down Expand Up @@ -391,8 +389,6 @@ public class VeniceServerConfig extends VeniceClusterConfig {
private final long sharedConsumerNonExistingTopicCleanupDelayMS;
private final int offsetLagDeltaRelaxFactorForFastOnlineTransitionInRestart;

private final boolean sharedKafkaProducerEnabled;
private final int sharedProducerPoolSizePerKafkaCluster;
private final Set<String> kafkaProducerMetrics;
/**
* Boolean flag indicating if it is a Da Vinci application.
Expand Down Expand Up @@ -653,9 +649,6 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
systemSchemaClusterName = serverProperties.getString(SYSTEM_SCHEMA_CLUSTER_NAME, "");
sharedConsumerNonExistingTopicCleanupDelayMS = serverProperties
.getLong(SERVER_SHARED_CONSUMER_NON_EXISTING_TOPIC_CLEANUP_DELAY_MS, TimeUnit.MINUTES.toMillis(10));
sharedKafkaProducerEnabled = serverProperties.getBoolean(SERVER_SHARED_KAFKA_PRODUCER_ENABLED, false);
sharedProducerPoolSizePerKafkaCluster =
serverProperties.getInt(SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER, 8);

List<String> kafkaProducerMetricsList = serverProperties.getList(
KAFKA_PRODUCER_METRICS,
Expand Down Expand Up @@ -1156,18 +1149,6 @@ public long getSharedConsumerNonExistingTopicCleanupDelayMS() {
return sharedConsumerNonExistingTopicCleanupDelayMS;
}

public boolean isSharedKafkaProducerEnabled() {
return sharedKafkaProducerEnabled;
}

public int getSharedProducerPoolSizePerKafkaCluster() {
return sharedProducerPoolSizePerKafkaCluster;
}

public Set<String> getKafkaProducerMetrics() {
return kafkaProducerMetrics;
}

public boolean isDaVinciClient() {
return isDaVinciClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@
import com.linkedin.venice.pubsub.PubSubProducerAdapterFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.adapter.kafka.producer.ApacheKafkaProducerConfig;
import com.linkedin.venice.pubsub.adapter.kafka.producer.SharedKafkaProducerAdapterFactory;
import com.linkedin.venice.pubsub.api.PubSubMessageDeserializer;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
Expand Down Expand Up @@ -231,22 +229,7 @@ public KafkaStoreIngestionService(
veniceConfigLoader.getVeniceClusterConfig().getClusterProperties().toProperties();

veniceWriterProperties.put(PubSubConstants.PUBSUB_PRODUCER_USE_HIGH_THROUGHPUT_DEFAULTS, "true");

// TODO: Move shared producer factory construction to upper layer and pass it in here.
LOGGER.info(
"Shared kafka producer service is {}",
serverConfig.isSharedKafkaProducerEnabled() ? "enabled" : "disabled");
if (serverConfig.isSharedKafkaProducerEnabled()) {
producerAdapterFactory = new SharedKafkaProducerAdapterFactory(
veniceWriterProperties,
serverConfig.getSharedProducerPoolSizePerKafkaCluster(),
new ApacheKafkaProducerAdapterFactory(),
metricsRepository,
serverConfig.getKafkaProducerMetrics());
} else {
producerAdapterFactory = pubSubClientsFactory.getProducerAdapterFactory();
}

producerAdapterFactory = pubSubClientsFactory.getProducerAdapterFactory();
VeniceWriterFactory veniceWriterFactory =
new VeniceWriterFactory(veniceWriterProperties, producerAdapterFactory, metricsRepository);
VeniceWriterFactory veniceWriterFactoryForMetaStoreWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,17 +786,6 @@ private ConfigKeys() {
public static final String FREEZE_INGESTION_IF_READY_TO_SERVE_OR_LOCAL_DATA_EXISTS =
"freeze.ingestion.if.ready.to.serve.or.local.data.exists";

/**
* Whether to enable shared kafka producer in storage node.
*/
public static final String SERVER_SHARED_KAFKA_PRODUCER_ENABLED = "server.shared.kafka.producer.enabled";

/**
* Shared kafka producer pool size per Kafka cluster.
*/
public static final String SERVER_KAFKA_PRODUCER_POOL_SIZE_PER_KAFKA_CLUSTER =
"server.kafka.producer.pool.size.per.kafka.cluster";

/**
* a comma seperated list of kafka producer metrics that will be reported.
* For ex. "outgoing-byte-rate,record-send-rate,batch-size-max,batch-size-avg,buffer-available-bytes,buffer-exhausted-rate"
Expand Down

This file was deleted.

Loading

0 comments on commit 2ab82a9

Please sign in to comment.