From fda5dba8bfffd448d2491ad247c48f095a79b16f Mon Sep 17 00:00:00 2001 From: Jialin Liu Date: Thu, 9 Nov 2023 13:24:26 -0800 Subject: [PATCH] [test] Add RMD CF data validation for A/A partial update TTL repush integration test --- .../venice/endToEnd/PartialUpdateTest.java | 113 +++++++++++++----- 1 file changed, 82 insertions(+), 31 deletions(-) diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java index f00b471d62..ae37d2ef65 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PartialUpdateTest.java @@ -15,6 +15,9 @@ import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_AGGREGATE; import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_CONTROLLER_D2_SERVICE; import static com.linkedin.venice.samza.VeniceSystemFactory.VENICE_PARENT_D2_ZK_HOSTS; +import static com.linkedin.venice.schema.rmd.RmdConstants.TIMESTAMP_FIELD_NAME; +import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.ACTIVE_ELEM_TS_FIELD_NAME; +import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.DELETED_ELEM_TS_FIELD_NAME; import static com.linkedin.venice.schema.rmd.v1.CollectionRmdTimestamp.TOP_LEVEL_TS_FIELD_NAME; import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducer; import static com.linkedin.venice.utils.IntegrationTestPushUtils.getSamzaProducerConfig; @@ -68,7 +71,6 @@ import com.linkedin.venice.meta.Version; import com.linkedin.venice.samza.VeniceSystemFactory; import com.linkedin.venice.schema.SchemaEntry; -import com.linkedin.venice.schema.rmd.RmdConstants; import com.linkedin.venice.schema.rmd.RmdSchemaEntry; import com.linkedin.venice.schema.rmd.RmdSchemaGenerator; import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry; @@ -740,10 +742,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com }); // Validate RMD bytes after PUT requests. validateRmdData(rmdSerDe, kafkaTopic_v1, key, rmdWithValueSchemaId -> { - GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName); List activeElementsTimestamps = - (List) collectionFieldTimestampRecord.get("activeElementsTimestamps"); + (List) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME); assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount); }); TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS, true, () -> { @@ -795,10 +797,10 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com // Validate RMD bytes after PUT requests. String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2); validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { - GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName); List activeElementsTimestamps = - (List) collectionFieldTimestampRecord.get("activeElementsTimestamps"); + (List) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME); assertEquals(activeElementsTimestamps.size(), totalUpdateCount * singleUpdateEntryCount); }); @@ -813,13 +815,13 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com }); validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { - GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get("timestamp"); + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName); List activeElementsTimestamps = - (List) collectionFieldTimestampRecord.get("activeElementsTimestamps"); + (List) collectionFieldTimestampRecord.get(ACTIVE_ELEM_TS_FIELD_NAME); assertEquals(activeElementsTimestamps.size(), singleUpdateEntryCount); List deletedElementsTimestamps = - (List) collectionFieldTimestampRecord.get("deletedElementsTimestamps"); + (List) collectionFieldTimestampRecord.get(DELETED_ELEM_TS_FIELD_NAME); assertEquals(deletedElementsTimestamps.size(), 0); }); @@ -831,10 +833,8 @@ public void testActiveActivePartialUpdateWithCompression(CompressionStrategy com assertTrue(nullRecord); }); validateRmdData(rmdSerDe, kafkaTopic_v2, key, rmdWithValueSchemaId -> { - Assert.assertTrue( - rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME) instanceof GenericRecord); - GenericRecord timestampRecord = - (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(RmdConstants.TIMESTAMP_FIELD_NAME); + Assert.assertTrue(rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME) instanceof GenericRecord); + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); GenericRecord collectionFieldTimestampRecord = (GenericRecord) timestampRecord.get(listFieldName); assertEquals(collectionFieldTimestampRecord.get(TOP_LEVEL_TS_FIELD_NAME), (long) (totalUpdateCount) * 10); }); @@ -850,6 +850,15 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { Schema valueSchema = AvroCompatibilityHelper.parse(loadFileAsString("CollectionRecordV1.avsc")); Schema partialUpdateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema); + Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema); + ReadOnlySchemaRepository schemaRepo = mock(ReadOnlySchemaRepository.class); + when(schemaRepo.getReplicationMetadataSchema(storeName, 1, 1)).thenReturn(new RmdSchemaEntry(1, 1, rmdSchema)); + when(schemaRepo.getDerivedSchema(storeName, 1, 1)).thenReturn(new DerivedSchemaEntry(1, 1, partialUpdateSchema)); + when(schemaRepo.getValueSchema(storeName, 1)).thenReturn(new SchemaEntry(1, valueSchema)); + StringAnnotatedStoreSchemaCache stringAnnotatedStoreSchemaCache = + new StringAnnotatedStoreSchemaCache(storeName, schemaRepo); + RmdSerDe rmdSerDe = new RmdSerDe(stringAnnotatedStoreSchemaCache, 1); + try (ControllerClient parentControllerClient = new ControllerClient(CLUSTER_NAME, parentControllerUrl)) { assertCommand( parentControllerClient @@ -880,20 +889,23 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { VeniceClusterWrapper veniceCluster = childDatacenters.get(0).getClusters().get(CLUSTER_NAME); SystemProducer veniceProducer = getSamzaProducer(veniceCluster, storeName, Version.PushType.STREAM); - + long STALE_TS = 99999L; + long FRESH_TS = 100000L; + String STRING_MAP_FILED = "stringMap"; + String REGULAR_FIELD = "name"; /** * Case 1: The record is partially stale, TTL repush should only keep the part that's fresh. */ String key1 = "key1"; // This update is expected to be carried into TTL repush. UpdateBuilder updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); - updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k1", "v1")); - sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 100000L); + updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k1", "v1")); + sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), FRESH_TS); // This update is expected to be WIPED OUT after TTL repush. updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); - updateBuilder.setNewFieldValue("name", "new_name"); - updateBuilder.setEntriesToAddToMapField("stringMap", Collections.singletonMap("k2", "v2")); - sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), 99999L); + updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name"); + updateBuilder.setEntriesToAddToMapField(STRING_MAP_FILED, Collections.singletonMap("k2", "v2")); + sendStreamingRecord(veniceProducer, storeName, key1, updateBuilder.build(), STALE_TS); /** * Case 2: The record is fully stale, TTL repush should drop the record. @@ -901,8 +913,8 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { String key2 = "key2"; // This update is expected to be WIPED OUT after TTL repush. updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); - updateBuilder.setNewFieldValue("name", "new_name_2"); - sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), 99999L); + updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_2"); + sendStreamingRecord(veniceProducer, storeName, key2, updateBuilder.build(), STALE_TS); /** * Case 3: The record is fully fresh, TTL repush should keep the record. @@ -910,8 +922,8 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { String key3 = "key3"; // This update is expected to be carried into TTL repush. updateBuilder = new UpdateBuilderImpl(partialUpdateSchema); - updateBuilder.setNewFieldValue("name", "new_name_3"); - sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), 100000L); + updateBuilder.setNewFieldValue(REGULAR_FIELD, "new_name_3"); + sendStreamingRecord(veniceProducer, storeName, key3, updateBuilder.build(), FRESH_TS); /** * Validate the data is ready in storage before TTL repush. @@ -922,25 +934,45 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { try { GenericRecord valueRecord = readValue(storeReader, key1); assertNotNull(valueRecord); - assertEquals(valueRecord.get("name"), new Utf8("new_name")); - assertNotNull(valueRecord.get("stringMap")); - Map stringMapValue = (Map) valueRecord.get("stringMap"); + assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name")); + assertNotNull(valueRecord.get(STRING_MAP_FILED)); + Map stringMapValue = (Map) valueRecord.get(STRING_MAP_FILED); assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1")); assertEquals(stringMapValue.get(new Utf8("k2")), new Utf8("v2")); valueRecord = readValue(storeReader, key2); assertNotNull(valueRecord); - assertEquals(valueRecord.get("name"), new Utf8("new_name_2")); + assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_2")); valueRecord = readValue(storeReader, key3); assertNotNull(valueRecord); - assertEquals(valueRecord.get("name"), new Utf8("new_name_3")); + assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3")); } catch (Exception e) { throw new VeniceException(e); } }); } + String kafkaTopic_v1 = Version.composeKafkaTopic(storeName, 1); + // Validate RMD bytes after initial update requests. + validateRmdData(rmdSerDe, kafkaTopic_v1, key1, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED); + Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), 0L); + Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Arrays.asList(STALE_TS, FRESH_TS)); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS); + }); + + validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS); + }); + + validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS); + }); + // Perform one time repush to make sure repush can handle RMD chunks data correctly. Properties props = IntegrationTestPushUtils.defaultVPJProps(multiRegionMultiClusterWrapper, "dummyInputPath", storeName); @@ -971,9 +1003,9 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { // Key 1 is partially preserved. GenericRecord valueRecord = readValue(storeReader, key1); assertNotNull(valueRecord); - assertEquals(valueRecord.get("name"), new Utf8("default_name")); - assertNotNull(valueRecord.get("stringMap")); - Map stringMapValue = (Map) valueRecord.get("stringMap"); + assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("default_name")); + assertNotNull(valueRecord.get(STRING_MAP_FILED)); + Map stringMapValue = (Map) valueRecord.get(STRING_MAP_FILED); assertEquals(stringMapValue.get(new Utf8("k1")), new Utf8("v1")); assertNull(stringMapValue.get(new Utf8("k2"))); @@ -984,13 +1016,32 @@ public void testRepushWithTTLWithActiveActivePartialUpdateStore() { // Key 3 is fully preserved. valueRecord = readValue(storeReader, key3); assertNotNull(valueRecord); - assertEquals(valueRecord.get("name"), new Utf8("new_name_3")); + assertEquals(valueRecord.get(REGULAR_FIELD), new Utf8("new_name_3")); } catch (Exception e) { throw new VeniceException(e); } }); } + String kafkaTopic_v2 = Version.composeKafkaTopic(storeName, 2); + // Validate RMD bytes after TTL repush. + validateRmdData(rmdSerDe, kafkaTopic_v2, key1, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + GenericRecord stringMapTsRecord = (GenericRecord) timestampRecord.get(STRING_MAP_FILED); + Assert.assertEquals(stringMapTsRecord.get(TOP_LEVEL_TS_FIELD_NAME), STALE_TS); + Assert.assertEquals(stringMapTsRecord.get(ACTIVE_ELEM_TS_FIELD_NAME), Collections.singletonList(FRESH_TS)); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS); + }); + validateRmdData(rmdSerDe, kafkaTopic_v1, key2, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), STALE_TS); + }); + + validateRmdData(rmdSerDe, kafkaTopic_v1, key3, rmdWithValueSchemaId -> { + GenericRecord timestampRecord = (GenericRecord) rmdWithValueSchemaId.getRmdRecord().get(TIMESTAMP_FIELD_NAME); + Assert.assertEquals(timestampRecord.get(REGULAR_FIELD), FRESH_TS); + }); + } private void validateRmdData(