Skip to content

Commit

Permalink
Merge branch 'linkedin:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
kvargha authored Jan 31, 2024
2 parents a12e1a5 + ee40965 commit 8231b72
Show file tree
Hide file tree
Showing 139 changed files with 3,215 additions and 8,909 deletions.
15 changes: 9 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ ext.libraries = [
fastUtil: 'it.unimi.dsi:fastutil:8.3.0',
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
helix: 'org.apache.helix:helix-core:1.1.0',
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.2',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.2.1',
httpCore5: 'org.apache.httpcomponents.core5:httpcore5:5.2.2',
httpCore5H2: 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.2',
httpClient: 'org.apache.httpcomponents:httpclient:4.5.2',
httpCore: 'org.apache.httpcomponents:httpcore:4.4.5',
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
httpClient5: 'org.apache.httpcomponents.client5:httpclient5:5.3',
httpCore5: 'org.apache.httpcomponents.core5:httpcore5:5.2.4',
httpCore5H2: 'org.apache.httpcomponents.core5:httpcore5-h2:5.2.4',
httpClient: 'org.apache.httpcomponents:httpclient:4.5.14',
httpCore: 'org.apache.httpcomponents:httpcore:4.4.16',
jacksonCore: "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}",
jacksonAnnotations: "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}",
jacksonDatabind: "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}",
Expand Down Expand Up @@ -731,6 +731,9 @@ ext.createDiffFile = { ->
// venice-standalone
':!services/venice-standalone/*', // exclude the entire standalone project

// admin-tool
':!clients/venice-admin-tool/*',

// Keep this last
// Other files that have tests but are not executed in the regular unit test task
':!internal/alpini/*'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static com.linkedin.venice.ConfigKeys.MAX_LEADER_FOLLOWER_STATE_TRANSITION_THREAD_NUMBER;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_CONCURRENCY;
import static com.linkedin.venice.ConfigKeys.META_STORE_WRITER_CLOSE_TIMEOUT_MS;
import static com.linkedin.venice.ConfigKeys.MIN_CONSUMER_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER;
import static com.linkedin.venice.ConfigKeys.OFFSET_LAG_DELTA_RELAX_FACTOR_FOR_FAST_ONLINE_TRANSITION_IN_RESTART;
import static com.linkedin.venice.ConfigKeys.PARTICIPANT_MESSAGE_CONSUMPTION_DELAY_MS;
import static com.linkedin.venice.ConfigKeys.PUB_SUB_ADMIN_ADAPTER_FACTORY_CLASS;
Expand Down Expand Up @@ -605,11 +606,14 @@ public VeniceServerConfig(VeniceProperties serverProperties, Map<String, Map<Str
}

consumerPoolSizePerKafkaCluster = serverProperties.getInt(SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER, 5);
if (consumerPoolSizePerKafkaCluster < MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER) {
int minimumConsumerNumInConsumerPoolPerKafkaCluster = serverProperties.getInt(
MIN_CONSUMER_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER,
VeniceServerConfig.MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER);

if (consumerPoolSizePerKafkaCluster < minimumConsumerNumInConsumerPoolPerKafkaCluster) {
throw new VeniceException(
SERVER_CONSUMER_POOL_SIZE_PER_KAFKA_CLUSTER + " shouldn't be less than: "
+ MINIMUM_CONSUMER_NUM_IN_CONSUMER_POOL_PER_KAFKA_CLUSTER + ", but it is "
+ consumerPoolSizePerKafkaCluster);
+ minimumConsumerNumInConsumerPoolPerKafkaCluster + ", but it is " + consumerPoolSizePerKafkaCluster);
}
leakedResourceCleanupEnabled = serverProperties.getBoolean(SERVER_LEAKED_RESOURCE_CLEANUP_ENABLED, true);
delayReadyToServeMS = serverProperties.getLong(SERVER_DELAY_REPORT_READY_TO_SERVE_MS, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public VeniceChangelogConsumerImpl(
this.userEventChunkingAdapter = GenericChunkingAdapter.INSTANCE;
this.storeDeserializerCache = new AvroStoreDeserializerCache<>(storeRepository, storeName, true);
}

LOGGER.info(
"Start a change log consumer client for store: {}, with partition count: {} and view class: {} ",
storeName,
Expand Down Expand Up @@ -761,9 +762,7 @@ private PubSubMessage<K, ChangeEvent<V>, VeniceChangeCoordinate> convertChangeEv
}

private V deserializeValueFromBytes(ByteBuffer byteBuffer, int valueSchemaId) {
Schema currentValueSchema = schemaReader.getValueSchema(valueSchemaId);
RecordDeserializer<V> valueDeserializer =
FastSerializerDeserializerFactory.getFastAvroGenericDeserializer(currentValueSchema, currentValueSchema);
RecordDeserializer<V> valueDeserializer = storeDeserializerCache.getDeserializer(valueSchemaId, valueSchemaId);
if (byteBuffer != null) {
return valueDeserializer.deserialize(byteBuffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_COLO_ID_FIELD_POS;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_POS;
import static com.linkedin.venice.schema.writecompute.WriteComputeOperation.NO_OP_ON_FIELD;
import static com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter.getFieldOperationType;
import static com.linkedin.venice.schema.writecompute.WriteComputeOperation.getFieldOperationType;

import com.linkedin.davinci.replication.RmdWithValueSchemaId;
import com.linkedin.davinci.schema.merge.ValueAndRmd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.SchemaReader;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
Expand Down Expand Up @@ -75,14 +76,14 @@ public class VeniceChangelogConsumerImplTest {
private Schema rmdSchema;
private SchemaReader schemaReader;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final Schema valueSchema = AvroCompatibilityHelper.parse("\"string\"");

@BeforeMethod
public void setUp() {
storeName = Utils.getUniqueString();
schemaReader = mock(SchemaReader.class);
Schema keySchema = AvroCompatibilityHelper.parse("\"string\"");
doReturn(keySchema).when(schemaReader).getKeySchema();
Schema valueSchema = AvroCompatibilityHelper.parse("\"string\"");
doReturn(valueSchema).when(schemaReader).getValueSchema(1);
rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema, 1);

Expand Down Expand Up @@ -134,6 +135,7 @@ public void testConsumeBeforeAndAfterImage() throws ExecutionException, Interrup
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.of(mockVersion));
Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));

veniceChangelogConsumer.setStoreRepository(mockRepository);
veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get();
Expand Down Expand Up @@ -257,6 +259,7 @@ public void testConsumeAfterImage() throws ExecutionException, InterruptedExcept
Mockito.when(store.getCurrentVersion()).thenReturn(1);
Mockito.when(store.getCompressionStrategy()).thenReturn(CompressionStrategy.NO_OP);
Mockito.when(mockRepository.getStore(anyString())).thenReturn(store);
Mockito.when(mockRepository.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));
Mockito.when(store.getVersion(Mockito.anyInt())).thenReturn(Optional.of(mockVersion));
veniceChangelogConsumer.setStoreRepository(mockRepository);
veniceChangelogConsumer.subscribe(new HashSet<>(Arrays.asList(0))).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,32 @@ public void testUpdateIgnoredFieldUpdate() {
final int incomingWriteComputeSchemaId = 3;
final int oldValueSchemaId = 3;
// Set up
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV2);
GenericRecord updateFieldWriteComputeRecord = AvroSchemaUtils.createGenericRecord(writeComputeSchema);
updateFieldWriteComputeRecord.put("age", 66);
updateFieldWriteComputeRecord.put("name", "Venice");
Schema writeComputeSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(personSchemaV3);
GenericRecord updateFieldWriteComputeRecord =
new UpdateBuilderImpl(writeComputeSchema).setNewFieldValue("nullableListField", null)
.setNewFieldValue("age", 66)
.setNewFieldValue("name", "Venice")
.build();
ByteBuffer writeComputeBytes = ByteBuffer.wrap(
MapOrderPreservingSerDeFactory.getSerializer(writeComputeSchema).serialize(updateFieldWriteComputeRecord));
final long valueLevelTimestamp = 10L;
Map<String, Long> fieldNameToTimestampMap = new HashMap<>();
fieldNameToTimestampMap.put("nullableListField", 10L);
fieldNameToTimestampMap.put("age", 10L);
fieldNameToTimestampMap.put("favoritePet", 10L);
fieldNameToTimestampMap.put("name", 10L);
fieldNameToTimestampMap.put("intArray", 10L);
fieldNameToTimestampMap.put("stringArray", 10L);
fieldNameToTimestampMap.put("stringMap", 10L);

GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV2, fieldNameToTimestampMap);
GenericRecord rmdRecord = createRmdWithFieldLevelTimestamp(personRmdSchemaV3, fieldNameToTimestampMap);
RmdWithValueSchemaId rmdWithValueSchemaId = new RmdWithValueSchemaId(oldValueSchemaId, RMD_VERSION_ID, rmdRecord);
ReadOnlySchemaRepository readOnlySchemaRepository = mock(ReadOnlySchemaRepository.class);
doReturn(new DerivedSchemaEntry(incomingValueSchemaId, 1, writeComputeSchema)).when(readOnlySchemaRepository)
.getDerivedSchema(storeName, incomingValueSchemaId, incomingWriteComputeSchemaId);
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository)
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getValueSchema(storeName, oldValueSchemaId);
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV2)).when(readOnlySchemaRepository)
doReturn(new SchemaEntry(oldValueSchemaId, personSchemaV3)).when(readOnlySchemaRepository)
.getSupersetSchema(storeName);
StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache =
new StringAnnotatedStoreSchemaCache(storeName, readOnlySchemaRepository);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Set;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

Expand All @@ -34,6 +35,7 @@ public class RocksDBStorageEngineTest extends AbstractStorageEngineTest {
private final ReadOnlyStoreRepository mockReadOnlyStoreRepository = mock(ReadOnlyStoreRepository.class);
private static final int versionNumber = 0;
private static final String topicName = Version.composeKafkaTopic(storeName, versionNumber);
private int testCount = 0;

@Override
public void createStorageEngineForTest() {
Expand Down Expand Up @@ -67,6 +69,24 @@ public void cleanUp() throws Exception {
storageService.stop();
}

@AfterMethod
public void testCounter() {
this.testCount++;
}

/**
* Some tests require a reset if other tests have run before them, as they are sensitive to contamination.
*
* Alternatively, we could make {@link #setUp()} have {@link org.testng.annotations.BeforeMethod} and
* {@link #cleanUp()} have {@link AfterMethod}, though that makes the class take longer than the current approach.
*/
private void reset() throws Exception {
if (this.testCount > 0) {
cleanUp();
setUp();
}
}

@Test
public void testGetAndPut() {
super.testGetAndPut();
Expand Down Expand Up @@ -154,11 +174,13 @@ public void testGetInvalidKeys() {

@Test
public void testPartitioning() throws Exception {
reset();
super.testPartitioning();
}

@Test
public void testAddingAPartitionTwice() throws Exception {
reset();
super.testAddingAPartitionTwice();
}

Expand Down
11 changes: 11 additions & 0 deletions clients/da-vinci-client/src/test/resources/avro/PersonV3.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
"name": "Person",
"type": "record",
"fields": [
{
"name": "nullableListField",
"type": [
"null",
{
"type": "array",
"items": "int"
}
],
"default": null
},
{
"name": "age",
"type": "int",
Expand Down
2 changes: 1 addition & 1 deletion clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ jar {
}

ext {
jacocoCoverageThreshold = 0.04
jacocoCoverageThreshold = 0.00
}
Loading

0 comments on commit 8231b72

Please sign in to comment.