diff --git a/build.gradle b/build.gradle index 96aae6bffc..f93565cda1 100644 --- a/build.gradle +++ b/build.gradle @@ -38,7 +38,7 @@ if (project.hasProperty('overrideBuildEnvironment')) { } def avroVersion = '1.10.2' -def avroUtilVersion = '0.3.18' +def avroUtilVersion = '0.3.19' def grpcVersion = '1.49.2' def kafkaGroup = 'com.linkedin.kafka' def kafkaVersion = '2.4.1.65' diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java index 8de246074f..e52e368b32 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/LeaderFollowerStoreIngestionTask.java @@ -50,6 +50,7 @@ import com.linkedin.venice.pubsub.api.PubSubTopic; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; import com.linkedin.venice.serialization.AvroStoreDeserializerCache; import com.linkedin.venice.stats.StatsErrorCode; import com.linkedin.venice.storage.protocol.ChunkedValueManifest; @@ -2808,8 +2809,9 @@ private void handleUpdateRequest( final int readerValueSchemaId; final int readerUpdateProtocolVersion; if (isIngestingSystemStore()) { - readerValueSchemaId = schemaRepository.getSupersetOrLatestValueSchema(storeName).getId(); - readerUpdateProtocolVersion = schemaRepository.getLatestDerivedSchema(storeName, readerValueSchemaId).getId(); + DerivedSchemaEntry latestDerivedSchemaEntry = schemaRepository.getLatestDerivedSchema(storeName); + readerValueSchemaId = latestDerivedSchemaEntry.getValueSchemaID(); + readerUpdateProtocolVersion = latestDerivedSchemaEntry.getId(); } else { SchemaEntry supersetSchemaEntry = schemaRepository.getSupersetSchema(storeName); if (supersetSchemaEntry == null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java index 09c4d92f98..f0ba1105b9 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartition.java @@ -25,7 +25,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; -import org.apache.commons.io.FileUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.rocksdb.BlockBasedTableConfig; @@ -923,20 +922,14 @@ public boolean verifyConfig(StoragePartitionConfig partitionConfig) { && writeOnly == partitionConfig.isWriteOnlyConfig(); } - /** - * This method calculates the file size by adding all subdirectories size - * @return the partition db size in bytes - */ @Override public long getPartitionSizeInBytes() { - File partitionDbDir = new File(fullPathForPartitionDB); - if (partitionDbDir.exists()) { - /** - * {@link FileUtils#sizeOf(File)} will throw {@link IllegalArgumentException} if the file/dir doesn't exist. - */ - return FileUtils.sizeOf(partitionDbDir); - } else { - return 0; + readCloseRWLock.readLock().lock(); + try { + makeSureRocksDBIsStillOpen(); + return getRocksDBStatValue("rocksdb.live-sst-files-size"); + } finally { + readCloseRWLock.readLock().unlock(); } } diff --git a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java index 70043e0ce9..ba638844d2 100644 --- a/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java +++ b/clients/da-vinci-client/src/test/java/com/linkedin/davinci/store/rocksdb/RocksDBStoragePartitionTest.java @@ -252,6 +252,8 @@ public void testIngestion( storagePartition.delete(toBeDeletedKey.getBytes()); Assert.assertNull(storagePartition.get(toBeDeletedKey.getBytes())); + Assert.assertTrue(storagePartition.getPartitionSizeInBytes() > 0); + Options storeOptions = storagePartition.getOptions(); Assert.assertEquals(storeOptions.level0FileNumCompactionTrigger(), 40); storagePartition.drop(); diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlySchemaRepository.java b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlySchemaRepository.java index 38d757a03c..10d5b5b6d6 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlySchemaRepository.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/meta/ReadOnlySchemaRepository.java @@ -1,6 +1,7 @@ package com.linkedin.venice.meta; import com.linkedin.venice.VeniceResource; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.schema.GeneratedSchemaID; import com.linkedin.venice.schema.SchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; @@ -68,4 +69,29 @@ public interface ReadOnlySchemaRepository extends VeniceResource { RmdSchemaEntry getReplicationMetadataSchema(String storeName, int valueSchemaId, int replicationMetadataVersionId); Collection getReplicationMetadataSchemas(String storeName); + + default DerivedSchemaEntry getLatestDerivedSchema(String storeName) { + SchemaEntry valueSchemaEntry = getSupersetOrLatestValueSchema(storeName); + try { + return getLatestDerivedSchema(storeName, valueSchemaEntry.getId()); + } catch (Exception e) { + /** + * Can't find the derived schema for the latest value schema, so it will fall back to find out the latest + * value schema, which has a valid derived schema. + */ + Collection derivedSchemaEntries = getDerivedSchemas(storeName); + DerivedSchemaEntry latestDerivedSchemaEntry = null; + for (DerivedSchemaEntry entry: derivedSchemaEntries) { + if (latestDerivedSchemaEntry == null || entry.getValueSchemaID() > latestDerivedSchemaEntry.getValueSchemaID() + || (entry.getValueSchemaID() == latestDerivedSchemaEntry.getValueSchemaID() + && entry.getId() > latestDerivedSchemaEntry.getId())) { + latestDerivedSchemaEntry = entry; + } + } + if (latestDerivedSchemaEntry == null) { + throw new VeniceException("Failed to find a valid derived schema for store: " + storeName); + } + return latestDerivedSchemaEntry; + } + } } diff --git a/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestReadOnlySchemaRepository.java b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestReadOnlySchemaRepository.java new file mode 100644 index 0000000000..13bb456011 --- /dev/null +++ b/internal/venice-common/src/test/java/com/linkedin/venice/meta/TestReadOnlySchemaRepository.java @@ -0,0 +1,47 @@ +package com.linkedin.venice.meta; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.linkedin.venice.exceptions.VeniceException; +import com.linkedin.venice.schema.SchemaEntry; +import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; +import java.util.ArrayList; +import java.util.List; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestReadOnlySchemaRepository { + @Test + public void testGetLatestDerivedSchema() { + String storeName = "test_store"; + // Latest value schema has a derived schema + SchemaEntry latestValueSchema = new SchemaEntry(1, "\"string\""); + ReadOnlySchemaRepository repository1 = mock(ReadOnlySchemaRepository.class); + when(repository1.getSupersetOrLatestValueSchema(storeName)).thenReturn(latestValueSchema); + DerivedSchemaEntry derivedSchemaEntry = new DerivedSchemaEntry(1, 1, "\"string\""); + when(repository1.getLatestDerivedSchema(storeName, 1)).thenReturn(derivedSchemaEntry); + when(repository1.getLatestDerivedSchema(storeName)).thenCallRealMethod(); + + Assert.assertEquals(repository1.getLatestDerivedSchema(storeName), derivedSchemaEntry); + + // Latest value schema doesn't have a corresponding derived schema. + SchemaEntry latestSupersetSchema = new SchemaEntry(10, "\"string\""); + ReadOnlySchemaRepository repository2 = mock(ReadOnlySchemaRepository.class); + when(repository2.getSupersetOrLatestValueSchema(storeName)).thenReturn(latestSupersetSchema); + when(repository2.getLatestDerivedSchema(storeName, 10)).thenThrow(new VeniceException("No derived schema")); + List derivedSchemaEntryList = new ArrayList<>(); + DerivedSchemaEntry latestDerivedSchema = new DerivedSchemaEntry(9, 2, "\"string\""); + derivedSchemaEntryList.add(new DerivedSchemaEntry(8, 1, "\"string\"")); + derivedSchemaEntryList.add(new DerivedSchemaEntry(8, 2, "\"string\"")); + derivedSchemaEntryList.add(latestDerivedSchema); + derivedSchemaEntryList.add(new DerivedSchemaEntry(9, 1, "\"string\"")); + when(repository2.getDerivedSchemas(storeName)).thenReturn(derivedSchemaEntryList); + + when(repository2.getLatestDerivedSchema(storeName)).thenCallRealMethod(); + + Assert.assertEquals(repository2.getLatestDerivedSchema(storeName), latestDerivedSchema); + } + +} diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybridQuota.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybridQuota.java index 06066a4f3c..66148e58d4 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybridQuota.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/TestHybridQuota.java @@ -222,7 +222,7 @@ public void testHybridStoreQuota(boolean chunkingEnabled, boolean isStreamReproc // Do a VPJ push runVPJ(vpjProperties, 3, controllerClient); String topicForStoreVersion3 = Version.composeKafkaTopic(storeName, 3); - long storageQuotaInByte = 60000; // A small quota, easily violated. + long storageQuotaInByte = 20000; // A small quota, easily violated. // Need to update store with quota here. controllerClient.updateStore( diff --git a/services/venice-server/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorAccessor.java b/services/venice-server/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorAccessor.java index 93450fcf24..b86e1a820c 100644 --- a/services/venice-server/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorAccessor.java +++ b/services/venice-server/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorAccessor.java @@ -6,5 +6,6 @@ public class FastDeserializerGeneratorAccessor { public static void setFieldsPerPopulationMethod(int limit) { FastDeserializerGenerator.setFieldsPerPopulationMethod(limit); + FastSerializerGenerator.setFieldsPerRecordSerializationMethod(limit); } }