Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[test] Add RMD CF data validation for A/A partial update TTL repush integration test #745

Merged
merged 1 commit into from
Nov 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_CONTROLLER_D2_SERVICE;
import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_D2_ZK_HOSTS;
import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.ACTIVE_ELEM_TS_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.DELETED_ELEM_TS_FIELD_NAME;
import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_NAME;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer;
import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducerConfig;
Expand Down Expand Up @@ -68,7 +71,6 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.samza.VeniceSystemFactory;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.rmd.RmdConstants;
import com.linkedin.venice.schema.rmd.RmdSchemaEntry;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
Expand Down Expand Up @@ -740,10 +742,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
});
// Validate RMD bytes after PUT requests.
validateRmdData(rmdSerDe, kafkaTopic_v1, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount);
});
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> {
Expand Down Expand Up @@ -795,10 +797,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
// Validate RMD bytes after PUT requests.
String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2);
validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount);
});

Expand All @@ -813,13 +815,13 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
});

validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp");
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
List<Long> activeElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("activeElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME);
assertEquals(activeElementsTimestamps.size(), singleUpdateEntryCount);
List<Long> deletedElementsTimestamps =
(List<Long>) collectionFieldTimestampRecord.get("deletedElementsTimestamps");
(List<Long>) collectionFieldTimestampRecord.get(DELETED_ELEM_TS_FIELD_NAME);
assertEquals(deletedElementsTimestamps.size(), 0);
});

Expand All @@ -831,10 +833,8 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com
assertTrue(nullRecord);
});
validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> {
Assert.assertTrue(
rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME) instanceof GenericRecord);
GenericRecord timestampRecord =
(GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME);
Assert.assertTrue(rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME) instanceof GenericRecord);
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName);
assertEquals(collectionFieldTimestampRecord.get(TOP_LEVEL_TS_FIELD_NAME), (long) (totalUpdateCount) * 10);
});
Expand All @@ -850,6 +850,15 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc"));
Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema);

Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema);
ReadOnlySchemaRepository schemaRepo = mock(ReadOnlySchemaRepository.class);
when(schemaRepo.getReplicationMetadataSchema(storeName, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, rmdSchema));
when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, partialUpdateSchema));
when(schemaRepo.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema));
StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache =
new StringAnnotatedStoreSchemaCache(storeName, schemaRepo);
RmdSerDe rmdSerDe = new RmdSerDe(stringAnnotatedStoreSchemaCache, 1);

try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) {
assertCommand(
parentControllerClient
Expand Down Expand Up @@ -880,38 +889,41 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {

VeniceClusterWrapper veniceCluster = childDatacenters.get(0).getClusters().get(CLUSTER_NAME);
SystemProducer veniceProducer = getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM);

long STALE_TS = 99999L;
long FRESH_TS = 100000L;
String STRING_MAP_FILED = "stringMap";
String REGULAR_FIELD = "name";
/**
* Case 1: The record is partially stale, TTL repush should only keep the part that's fresh.
*/
String key1 = "key1";
// This update is expected to be carried into TTL repush.
UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k1", "v1"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 100000L);
updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k1", "v1"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), FRESH_TS);
// This update is expected to be WIPED OUT after TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name");
updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k2", "v2"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 99999L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name");
updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k2", "v2"));
sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), STALE_TS);

/**
* Case 2: The record is fully stale, TTL repush should drop the record.
*/
String key2 = "key2";
// This update is expected to be WIPED OUT after TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name_2");
sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), 99999L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_2");
sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), STALE_TS);

/**
* Case 3: The record is fully fresh, TTL repush should keep the record.
*/
String key3 = "key3";
// This update is expected to be carried into TTL repush.
updateBuilder = new UpdateBuilderImpl(partialUpdateSchema);
updateBuilder.setNewFieldValue("name", "new_name_3");
sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), 100000L);
updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_3");
sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), FRESH_TS);

/**
* Validate the data is ready in storage before TTL repush.
Expand All @@ -922,25 +934,45 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
try {
GenericRecord valueRecord = readValue(storeReader, key1);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name"));
assertNotNull(valueRecord.get("stringMap"));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get("stringMap");
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name"));
assertNotNull(valueRecord.get(STRING_MAP_FILED));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get(STRING_MAP_FILED);
assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1"));
assertEquals(stringMapValue.get(new Utf8("k2")), new Utf8("v2"));

valueRecord = readValue(storeReader, key2);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_2"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_2"));

valueRecord = readValue(storeReader, key3);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_3"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3"));
} catch (Exception e) {
throw new VeniceException(e);
}
});
}

String kafkaTopic_v1 = Version.composeKafkaTopic(storeName, 1);
// Validate RMD bytes after initial update requests.
validateRmdData(rmdSerDe, kafkaTopic_v1, key1, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED);
Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), 0L);
Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Arrays.asList(STALE_TS, FRESH_TS));
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS);
});

// Perform one time repush to make sure repush can handle RMD chunks data correctly.
Properties props =
IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, "dummyInputPath", storeName);
Expand Down Expand Up @@ -971,9 +1003,9 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
// Key 1 is partially preserved.
GenericRecord valueRecord = readValue(storeReader, key1);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("default_name"));
assertNotNull(valueRecord.get("stringMap"));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get("stringMap");
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("default_name"));
assertNotNull(valueRecord.get(STRING_MAP_FILED));
Map<Utf8, Utf8> stringMapValue = (Map<Utf8, Utf8>) valueRecord.get(STRING_MAP_FILED);
assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1"));
assertNull(stringMapValue.get(new Utf8("k2")));

Expand All @@ -984,13 +1016,32 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() {
// Key 3 is fully preserved.
valueRecord = readValue(storeReader, key3);
assertNotNull(valueRecord);
assertEquals(valueRecord.get("name"), new Utf8("new_name_3"));
assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3"));
} catch (Exception e) {
throw new VeniceException(e);
}
});
}

String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2);
// Validate RMD bytes after TTL repush.
validateRmdData(rmdSerDe, kafkaTopic_v2, key1, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED);
Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), STALE_TS);
Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.singletonList(FRESH_TS));
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});
validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS);
});

validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> {
GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME);
Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS);
});

}

private void validateRmdData(
Expand Down