Skip to content

Commit

Permalink
[test] Add RMD CF data validation for A/A partial update TTL repush i…
Browse files Browse the repository at this point in the history
…ntegration test
  • Loading branch information
sixpluszero committed Nov 9, 2023
1 parent dcfcd39 commit fda5dba
Showing 1 changed file with 82 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_CONTROLLER_D2_SERVICE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_D2_ZK_HOSTS;
import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.ACTIVE_ELEM_TS_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.DELETED_ELEM_TS_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_NAME;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducerConfig;
Expand Down Expand Up @@ -68,7 +71,6 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
Expand Down Expand Up @@ -740,10 +742,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
});
// Validate RMD bytes after PUT requests.
validateRmdData(rmdSerDe, kafkaTopic_v1, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount);
});
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> {
Expand Down Expand Up @@ -795,10 +797,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
// Validate RMD bytes after PUT requests.
String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2);
validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount);
});

Expand All @@ -813,13 +815,13 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
});

validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), singleUpdateEntryCount);
List<Long> deletedElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("deletedElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(DELETED_ELEM_TS_FIELD_NAME);
assertEquals(deletedElementsTimestamps.size(), 0);
});

Expand All @@ -831,10 +833,8 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
assertTrue(nullRecord);
});
validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
Assert.assertTrue(
rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME) instanceof GenericRecord);
GenericRecord timestampRecord =
(GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME);
Assert.assertTrue(rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME) instanceof GenericRecord);
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
assertEquals(collectionFieldTimestampRecord.get(TOP_LEVEL_TS_FIELD_NAME), (long) (totalUpdateCount) * 10);
});
Expand All @@ -850,6 +850,15 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc"));
Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema);

Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema);
ReadOnlySchemaRepository schemaRepo = mock(ReadOnlySchemaRepository.class);
when(schemaRepo.getReplicationMetadataSchema(storeName, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, rmdSchema));
when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, partialUpdateSchema));
when(schemaRepo.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));
StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache =
new StringAnnotatedStoreSchemaCache(storeName, schemaRepo);
RmdSerDe rmdSerDe = new RmdSerDe(stringAnnotatedStoreSchemaCache, 1);

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) {
assertCommand(
parentControllerClient
Expand Down Expand Up @@ -880,38 +889,41 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {

VeniceClusterWrapper veniceCluster = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
SystemProducer veniceProducer = getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM);

long STALE_TS = 99999L;
long FRESH_TS = 100000L;
String STRING_MAP_FILED = "stringMap";
String REGULAR_FIELD = "name";
/**
* Case 1: The record is partially stale, TTL repush should only keep the part that's fresh.
*/
String key1 = "key1";
// This update is expected to be carried into TTL repush.
UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k1", "v1"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 100000L);
updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k1", "v1"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), FRESH_TS);
// This update is expected to be WIPED OUT after TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name");
updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k2", "v2"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 99999L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name");
updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k2", "v2"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), STALE_TS);

/**
* Case 2: The record is fully stale, TTL repush should drop the record.
*/
String key2 = "key2";
// This update is expected to be WIPED OUT after TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name_2");
sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), 99999L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_2");
sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), STALE_TS);

/**
* Case 3: The record is fully fresh, TTL repush should keep the record.
*/
String key3 = "key3";
// This update is expected to be carried into TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name_3");
sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), 100000L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_3");
sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), FRESH_TS);

/**
* Validate the data is ready in storage before TTL repush.
Expand All @@ -922,25 +934,45 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
try {
GenericRecord valueRecord = readValue(storeReader, key1);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name"));
assertNotNull(valueRecord.get("stringMap"));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get("stringMap");
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name"));
assertNotNull(valueRecord.get(STRING_MAP_FILED));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get(STRING_MAP_FILED);
assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1"));
assertEquals(stringMapValue.get(new Utf8("k2")), new Utf8("v2"));

valueRecord = readValue(storeReader, key2);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_2"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_2"));

valueRecord = readValue(storeReader, key3);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_3"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3"));
} catch (Exception e) {
throw new VeniceException(e);
}
});
}

String kafkaTopic_v1 = Version.composeKafkaTopic(storeName, 1);
// Validate RMD bytes after initial update requests.
validateRmdData(rmdSerDe, kafkaTopic_v1, key1, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED);
Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), 0L);
Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Arrays.asList(STALE_TS, FRESH_TS));
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS);
});

// Perform one time repush to make sure repush can handle RMD chunks data correctly.
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, "dummyInputPath", storeName);
Expand Down Expand Up @@ -971,9 +1003,9 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
// Key 1 is partially preserved.
GenericRecord valueRecord = readValue(storeReader, key1);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("default_name"));
assertNotNull(valueRecord.get("stringMap"));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get("stringMap");
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("default_name"));
assertNotNull(valueRecord.get(STRING_MAP_FILED));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get(STRING_MAP_FILED);
assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1"));
assertNull(stringMapValue.get(new Utf8("k2")));

Expand All @@ -984,13 +1016,32 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
// Key 3 is fully preserved.
valueRecord = readValue(storeReader, key3);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_3"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3"));
} catch (Exception e) {
throw new VeniceException(e);
}
});
}

String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2);
// Validate RMD bytes after TTL repush.
validateRmdData(rmdSerDe, kafkaTopic_v2, key1, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED);
Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), STALE_TS);
Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.singletonList(FRESH_TS));
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});
validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS);
});

}

private void validateRmdData(
Expand Down

0 comments on commit fda5dba

Please sign in to comment.