Skip to content

Commit

Permalink
[all][server][dvc] Fast-avro upgrade, fix meta system store schema up…
Browse files Browse the repository at this point in the history
…date issue and partition size race condition (linkedin#690)

* [all][server][dvc] Fast-avro upgrade, fix meta system store schema update issue and partition size race condition

This code change includes several fixes:
1. Bump up fast-avro lib tot pick up the recent serde optimizations.
2. Fix the meta system store schema race condition issue in Venice Server.
   Previously, the update schema may not be available for the latest superset schema since there is a delay
   between registering superset schema and its derived schema. The fix will always retrieve a latest value
   schema with a valid derived schema.
3. Use RocksDB API to fetch partition size to avoid the following race condition:
   RocksDB log compaction can delete file in the middle of measuring the directory size.
4. Let 'fast.avro.field.limit.per.method' to control fast serializer size as well.
  • Loading branch information
gaojieliu authored Oct 11, 2023
1 parent 5a3b8ea commit e4d7a79
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 17 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ if (project.hasProperty('overrideBuildEnvironment')) {
}

def avroVersion = '1.10.2'
def avroUtilVersion = '0.3.18'
def avroUtilVersion = '0.3.19'
def grpcVersion = '1.49.2'
def kafkaGroup = 'com.linkedin.kafka'
def kafkaVersion = '2.4.1.65'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.linkedin.venice.pubsub.api.PubSubTopic;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.AvroStoreDeserializerCache;
import com.linkedin.venice.stats.StatsErrorCode;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
Expand Down Expand Up @@ -2808,8 +2809,9 @@ private void handleUpdateRequest(
final int readerValueSchemaId;
final int readerUpdateProtocolVersion;
if (isIngestingSystemStore()) {
readerValueSchemaId = schemaRepository.getSupersetOrLatestValueSchema(storeName).getId();
readerUpdateProtocolVersion = schemaRepository.getLatestDerivedSchema(storeName, readerValueSchemaId).getId();
DerivedSchemaEntry latestDerivedSchemaEntry = schemaRepository.getLatestDerivedSchema(storeName);
readerValueSchemaId = latestDerivedSchemaEntry.getValueSchemaID();
readerUpdateProtocolVersion = latestDerivedSchemaEntry.getId();
} else {
SchemaEntry supersetSchemaEntry = schemaRepository.getSupersetSchema(storeName);
if (supersetSchemaEntry == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.BlockBasedTableConfig;
Expand Down Expand Up @@ -923,20 +922,14 @@ public boolean verifyConfig(StoragePartitionConfig partitionConfig) {
&& writeOnly == partitionConfig.isWriteOnlyConfig();
}

/**
* This method calculates the file size by adding all subdirectories size
* @return the partition db size in bytes
*/
@Override
public long getPartitionSizeInBytes() {
File partitionDbDir = new File(fullPathForPartitionDB);
if (partitionDbDir.exists()) {
/**
* {@link FileUtils#sizeOf(File)} will throw {@link IllegalArgumentException} if the file/dir doesn't exist.
*/
return FileUtils.sizeOf(partitionDbDir);
} else {
return 0;
readCloseRWLock.readLock().lock();
try {
makeSureRocksDBIsStillOpen();
return getRocksDBStatValue("rocksdb.live-sst-files-size");
} finally {
readCloseRWLock.readLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ public void testIngestion(
storagePartition.delete(toBeDeletedKey.getBytes());
Assert.assertNull(storagePartition.get(toBeDeletedKey.getBytes()));

Assert.assertTrue(storagePartition.getPartitionSizeInBytes() > 0);

Options storeOptions = storagePartition.getOptions();
Assert.assertEquals(storeOptions.level0FileNumCompactionTrigger(), 40);
storagePartition.drop();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.venice.meta;

import com.linkedin.venice.VeniceResource;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.GeneratedSchemaID;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
Expand Down Expand Up @@ -68,4 +69,29 @@ public interface ReadOnlySchemaRepository extends VeniceResource {
RmdSchemaEntry getReplicationMetadataSchema(String storeName, int valueSchemaId, int replicationMetadataVersionId);

Collection<RmdSchemaEntry> getReplicationMetadataSchemas(String storeName);

default DerivedSchemaEntry getLatestDerivedSchema(String storeName) {
SchemaEntry valueSchemaEntry = getSupersetOrLatestValueSchema(storeName);
try {
return getLatestDerivedSchema(storeName, valueSchemaEntry.getId());
} catch (Exception e) {
/**
* Can't find the derived schema for the latest value schema, so it will fall back to find out the latest
* value schema, which has a valid derived schema.
*/
Collection<DerivedSchemaEntry> derivedSchemaEntries = getDerivedSchemas(storeName);
DerivedSchemaEntry latestDerivedSchemaEntry = null;
for (DerivedSchemaEntry entry: derivedSchemaEntries) {
if (latestDerivedSchemaEntry == null || entry.getValueSchemaID() > latestDerivedSchemaEntry.getValueSchemaID()
|| (entry.getValueSchemaID() == latestDerivedSchemaEntry.getValueSchemaID()
&& entry.getId() > latestDerivedSchemaEntry.getId())) {
latestDerivedSchemaEntry = entry;
}
}
if (latestDerivedSchemaEntry == null) {
throw new VeniceException("Failed to find a valid derived schema for store: " + storeName);
}
return latestDerivedSchemaEntry;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.linkedin.venice.meta;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import java.util.ArrayList;
import java.util.List;
import org.testng.Assert;
import org.testng.annotations.Test;


public class TestReadOnlySchemaRepository {
@Test
public void testGetLatestDerivedSchema() {
String storeName = "test_store";
// Latest value schema has a derived schema
SchemaEntry latestValueSchema = new SchemaEntry(1, "\"string\"");
ReadOnlySchemaRepository repository1 = mock(ReadOnlySchemaRepository.class);
when(repository1.getSupersetOrLatestValueSchema(storeName)).thenReturn(latestValueSchema);
DerivedSchemaEntry derivedSchemaEntry = new DerivedSchemaEntry(1, 1, "\"string\"");
when(repository1.getLatestDerivedSchema(storeName, 1)).thenReturn(derivedSchemaEntry);
when(repository1.getLatestDerivedSchema(storeName)).thenCallRealMethod();

Assert.assertEquals(repository1.getLatestDerivedSchema(storeName), derivedSchemaEntry);

// Latest value schema doesn't have a corresponding derived schema.
SchemaEntry latestSupersetSchema = new SchemaEntry(10, "\"string\"");
ReadOnlySchemaRepository repository2 = mock(ReadOnlySchemaRepository.class);
when(repository2.getSupersetOrLatestValueSchema(storeName)).thenReturn(latestSupersetSchema);
when(repository2.getLatestDerivedSchema(storeName, 10)).thenThrow(new VeniceException("No derived schema"));
List<DerivedSchemaEntry> derivedSchemaEntryList = new ArrayList<>();
DerivedSchemaEntry latestDerivedSchema = new DerivedSchemaEntry(9, 2, "\"string\"");
derivedSchemaEntryList.add(new DerivedSchemaEntry(8, 1, "\"string\""));
derivedSchemaEntryList.add(new DerivedSchemaEntry(8, 2, "\"string\""));
derivedSchemaEntryList.add(latestDerivedSchema);
derivedSchemaEntryList.add(new DerivedSchemaEntry(9, 1, "\"string\""));
when(repository2.getDerivedSchemas(storeName)).thenReturn(derivedSchemaEntryList);

when(repository2.getLatestDerivedSchema(storeName)).thenCallRealMethod();

Assert.assertEquals(repository2.getLatestDerivedSchema(storeName), latestDerivedSchema);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void testHybridStoreQuota(boolean chunkingEnabled, boolean isStreamReproc
// Do a VPJ push
runVPJ(vpjProperties, 3, controllerClient);
String topicForStoreVersion3 = Version.composeKafkaTopic(storeName, 3);
long storageQuotaInByte = 60000; // A small quota, easily violated.
long storageQuotaInByte = 20000; // A small quota, easily violated.

// Need to update store with quota here.
controllerClient.updateStore(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
public class FastDeserializerGeneratorAccessor {
public static void setFieldsPerPopulationMethod(int limit) {
FastDeserializerGenerator.setFieldsPerPopulationMethod(limit);
FastSerializerGenerator.setFieldsPerRecordSerializationMethod(limit);
}
}

0 comments on commit e4d7a79

Please sign in to comment.