Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
sixpluszero committed Jan 27, 2024
1 parent 53500d0 commit bac9b92
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,7 @@ public KafkaTopicDumper(
for (MultiSchemaResponse.Schema valueSchema: schemas) {
this.allValueSchemas[i] = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(valueSchema.getSchemaStr());
i++;
this.schemaDataMap
.put(valueSchema.getId(), new ValueAndDerivedSchemaData(valueSchema.getId(), valueSchema.getSchemaStr()));
this.schemaDataMap.put(valueSchema.getId(), new ValueAndDerivedSchemaData(valueSchema.getSchemaStr()));
}
if (storeInfo.isWriteComputationEnabled()) {
for (MultiSchemaResponse.Schema schema: controllerClient.getAllValueAndDerivedSchema(storeName).getSchemas()) {
Expand All @@ -192,16 +191,13 @@ public KafkaTopicDumper(
if (storeInfo.isActiveActiveReplicationEnabled()) {
for (MultiSchemaResponse.Schema schema: controllerClient.getAllReplicationMetadataSchemas(storeName)
.getSchemas()) {
if (!schema.isDerivedSchema()) {
continue;
}
/**
* This is intended, as {@link com.linkedin.venice.controller.server.SchemaRoutes} implementation is wrong
* for RMD schema entry.
*/
int valueSchemaId = schema.getRmdValueSchemaId();
int protocolId = schema.getId();
this.schemaDataMap.get(valueSchemaId).setUpdateSchema(protocolId, schema.getSchemaStr());
this.schemaDataMap.get(valueSchemaId).setRmdSchema(protocolId, schema.getSchemaStr());
}
}
}
Expand Down Expand Up @@ -385,6 +381,12 @@ String buildDataRecordLog(
&& put.replicationMetadataPayload.remaining() > 0) {
Decoder rmdDecoder =
decoderFactory.binaryDecoder(ByteUtils.extractByteArray(put.replicationMetadataPayload), null);
LOGGER.info(
"{} {} {} {}",
schemaDataMap.get(put.schemaId).getRmdRecordReader(put.replicationMetadataVersionId),
schemaDataMap.get(put.schemaId),
put.schemaId,
put.replicationMetadataVersionId);
rmdRecord = schemaDataMap.get(put.schemaId)
.getRmdRecordReader(put.replicationMetadataVersionId)
.read(null, rmdDecoder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,27 @@

public class ValueAndDerivedSchemaData {
private final Schema valueSchema;
private final int id;
private final Map<Integer, Schema> updateSchemaMap = new VeniceConcurrentHashMap<>();
private final Map<Integer, Schema> rmdSchemaMap = new VeniceConcurrentHashMap<>();
private final GenericDatumReader<Object> valueRecordReader;
private final Map<Integer, GenericDatumReader<Object>> updateRecordReaderMap = new VeniceConcurrentHashMap<>();
private final Map<Integer, GenericDatumReader<Object>> rmdRecordReaderMap = new VeniceConcurrentHashMap<>();

public ValueAndDerivedSchemaData(int id, String valueSchemaStr) {
public ValueAndDerivedSchemaData(String valueSchemaStr) {
this.valueSchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(valueSchemaStr);
this.id = id;
this.valueRecordReader = new GenericDatumReader<>(valueSchema);
this.valueRecordReader = new GenericDatumReader<>(valueSchema, valueSchema);
}

public void setUpdateSchema(int protocolId, String updateSchemaStr) {
Schema schema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(updateSchemaStr);
updateSchemaMap.put(protocolId, schema);
updateRecordReaderMap.put(protocolId, new GenericDatumReader<>(schema));
updateRecordReaderMap.put(protocolId, new GenericDatumReader<>(schema, schema));
}

public void setRmdSchema(int protocolId, String rmdSchemaStr) {
Schema schema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(rmdSchemaStr);
rmdSchemaMap.put(protocolId, schema);
rmdRecordReaderMap.put(protocolId, new GenericDatumReader<>(schema));
rmdRecordReaderMap.put(protocolId, new GenericDatumReader<>(schema, schema));
}

GenericDatumReader<Object> getValueRecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

import static com.linkedin.venice.kafka.protocol.enums.MessageType.DELETE;
import static com.linkedin.venice.kafka.protocol.enums.MessageType.PUT;
import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.linkedin.venice.chunking.TestChunkingUtils;
import com.linkedin.venice.controllerapi.ControllerClient;
import com.linkedin.venice.controllerapi.MultiSchemaResponse;
import com.linkedin.venice.controllerapi.SchemaResponse;
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.kafka.protocol.Delete;
import com.linkedin.venice.kafka.protocol.GUID;
import com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope;
import com.linkedin.venice.kafka.protocol.ProducerMetadata;
import com.linkedin.venice.kafka.protocol.Put;
import com.linkedin.venice.kafka.protocol.Update;
import com.linkedin.venice.message.KafkaKey;
import com.linkedin.venice.meta.StoreInfo;
import com.linkedin.venice.meta.Version;
Expand All @@ -23,16 +26,27 @@
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.schema.rmd.RmdSchemaGenerator;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.KeyWithChunkingSuffixSerializer;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.ChunkedValueManifestSerializer;
import com.linkedin.venice.serializer.RecordSerializer;
import com.linkedin.venice.serializer.SerializerDeserializerFactory;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.storage.protocol.ChunkedValueManifest;
import com.linkedin.venice.utils.ByteUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.writer.update.UpdateBuilderImpl;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.logging.log4j.LogManager;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -119,11 +133,142 @@ public void testAdminToolConsumptionForChunkedData() throws IOException {
" ChunkMd=(type:WITH_CHUNK_MANIFEST, FirstChunkMd=(guid:00000000000000000000000000000000,seg:1,seq:1))");

PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> pubSubMessage5 =
getDeleteRecord(serializedKey, 4, pubSubTopicPartition);
getDeleteRecord(serializedKey, null, pubSubTopicPartition);
String deleteChunkMetadataLog = kafkaTopicDumper.getChunkMetadataLog(pubSubMessage5);
Assert.assertEquals(deleteChunkMetadataLog, " ChunkMd=(type:WITH_FULL_VALUE)");
}

@Test
public void testDumpDataRecord() throws IOException {
Schema keySchema = TestWriteUtils.STRING_SCHEMA;
Schema valueSchema = TestWriteUtils.NAME_RECORD_V1_SCHEMA;
Schema updateSchema = WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema);
Schema rmdSchema = RmdSchemaGenerator.generateMetadataSchema(valueSchema);
RecordSerializer keySerializer = SerializerDeserializerFactory.getAvroGenericSerializer(keySchema);
RecordSerializer valueSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(valueSchema);
RecordSerializer updateSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(updateSchema);
RecordSerializer rmdSerializer = SerializerDeserializerFactory.getAvroGenericSerializer(rmdSchema);

String storeName = "test_store";
int versionNumber = 1;
String topic = Version.composeKafkaTopic(storeName, versionNumber);
ControllerClient controllerClient = mock(ControllerClient.class);

SchemaResponse keySchemaResponse = mock(SchemaResponse.class);
when(keySchemaResponse.getSchemaStr()).thenReturn(keySchema.toString());
when(controllerClient.getKeySchema(storeName)).thenReturn(keySchemaResponse);

MultiSchemaResponse valueSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema[] valueSchemas = new MultiSchemaResponse.Schema[1];
valueSchemas[0] = new MultiSchemaResponse.Schema();
valueSchemas[0].setId(1);
valueSchemas[0].setSchemaStr(valueSchema.toString());
when(valueSchemaResponse.getSchemas()).thenReturn(valueSchemas);
when(controllerClient.getAllValueSchema(storeName)).thenReturn(valueSchemaResponse);

MultiSchemaResponse rmdSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema[] rmdSchemas = new MultiSchemaResponse.Schema[1];
rmdSchemas[0] = new MultiSchemaResponse.Schema();
rmdSchemas[0].setSchemaStr(rmdSchema.toString());
rmdSchemas[0].setId(1);
rmdSchemas[0].setRmdValueSchemaId(1);
when(rmdSchemaResponse.getSchemas()).thenReturn(rmdSchemas);
when(controllerClient.getAllReplicationMetadataSchemas(storeName)).thenReturn(rmdSchemaResponse);

MultiSchemaResponse valueAndDerivedSchemaResponse = mock(MultiSchemaResponse.class);
MultiSchemaResponse.Schema[] valueAndDerivedSchema = new MultiSchemaResponse.Schema[2];
valueAndDerivedSchema[0] = valueSchemas[0];
valueAndDerivedSchema[1] = new MultiSchemaResponse.Schema();
valueAndDerivedSchema[1].setDerivedSchemaId(1);
valueAndDerivedSchema[1].setId(1);
valueAndDerivedSchema[1].setSchemaStr(updateSchema.toString());
when(valueAndDerivedSchemaResponse.getSchemas()).thenReturn(valueAndDerivedSchema);
when(controllerClient.getAllValueAndDerivedSchema(storeName)).thenReturn(valueAndDerivedSchemaResponse);

StoreResponse storeResponse = mock(StoreResponse.class);
StoreInfo storeInfo = mock(StoreInfo.class);
Version version = mock(Version.class);
when(version.isChunkingEnabled()).thenReturn(false);
when(storeInfo.getPartitionCount()).thenReturn(1);
when(storeInfo.isActiveActiveReplicationEnabled()).thenReturn(true);
when(storeInfo.isWriteComputationEnabled()).thenReturn(true);
when(storeInfo.getVersion(versionNumber)).thenReturn(Optional.of(version));
when(controllerClient.getStore(storeName)).thenReturn(storeResponse);
when(storeResponse.getStore()).thenReturn(storeInfo);

PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
int assignedPartition = 0;
long startOffset = 0;
long endOffset = 4;
String keyString = "test";
byte[] serializedKey = keySerializer.serialize(keyString);
PubSubTopicPartition pubSubTopicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topic), assignedPartition);

ApacheKafkaConsumerAdapter apacheKafkaConsumer = mock(ApacheKafkaConsumerAdapter.class);
long startTimestamp = 10;
long endTimestamp = 20;
when(apacheKafkaConsumer.offsetForTime(pubSubTopicPartition, startTimestamp)).thenReturn(startOffset);
when(apacheKafkaConsumer.offsetForTime(pubSubTopicPartition, endTimestamp)).thenReturn(endOffset);
when(apacheKafkaConsumer.endOffset(pubSubTopicPartition)).thenReturn(endOffset);

KafkaTopicDumper kafkaTopicDumper = new KafkaTopicDumper(
controllerClient,
apacheKafkaConsumer,
topic,
assignedPartition,
0,
2,
"",
3,
true,
true,
false);

// Test different message type.
GenericRecord valueRecord = new GenericData.Record(valueSchema);
valueRecord.put("firstName", "f1");
valueRecord.put("lastName", "l1");
GenericRecord rmdRecord = new GenericData.Record(rmdSchema);
rmdRecord.put("timestamp", 1L);
rmdRecord.put("replication_checkpoint_vector", Collections.singletonList(1L));
GenericRecord updateRecord = new UpdateBuilderImpl(updateSchema).setNewFieldValue("firstName", "f2").build();

// Test PUT with and without RMD
LogManager.getLogger(TestKafkaTopicDumper.class).info("DEBUGGING: {} {}", rmdRecord, rmdSchema);
byte[] serializedValue = valueSerializer.serialize(valueRecord);
byte[] serializedRmd = rmdSerializer.serialize(rmdRecord);
byte[] serializedUpdate = updateSerializer.serialize(updateRecord);
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> putMessage =
getPutRecord(serializedKey, serializedValue, serializedRmd, pubSubTopicPartition);
String returnedLog = kafkaTopicDumper.buildDataRecordLog(putMessage, false);
String expectedLog = String.format("Key: %s; Value: %s; Schema: %d", keyString, valueRecord, 1);
Assert.assertEquals(returnedLog, expectedLog);
returnedLog = kafkaTopicDumper.buildDataRecordLog(putMessage, true);
expectedLog = String.format("Key: %s; Value: %s; Schema: %d; RMD: %s", keyString, valueRecord, 1, rmdRecord);
Assert.assertEquals(returnedLog, expectedLog);

// Test UPDATE
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> updateMessage =
getUpdateRecord(serializedKey, serializedUpdate, pubSubTopicPartition);
returnedLog = kafkaTopicDumper.buildDataRecordLog(updateMessage, false);
expectedLog = String.format("Key: %s; Value: %s; Schema: %d-%d", keyString, updateRecord, 1, 1);
Assert.assertEquals(returnedLog, expectedLog);
returnedLog = kafkaTopicDumper.buildDataRecordLog(updateMessage, true);
expectedLog = String.format("Key: %s; Value: %s; Schema: %d-%d; RMD: null", keyString, updateRecord, 1, 1);
Assert.assertEquals(returnedLog, expectedLog);

// Test DELETE with and without RMD
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> deleteMessage =
getDeleteRecord(serializedKey, serializedRmd, pubSubTopicPartition);
returnedLog = kafkaTopicDumper.buildDataRecordLog(deleteMessage, false);
expectedLog = String.format("Key: %s; Value: %s; Schema: %d", keyString, null, 1);
Assert.assertEquals(returnedLog, expectedLog);
returnedLog = kafkaTopicDumper.buildDataRecordLog(deleteMessage, true);
expectedLog = String.format("Key: %s; Value: %s; Schema: %d; RMD: %s", keyString, null, 1, rmdRecord);
Assert.assertEquals(returnedLog, expectedLog);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getChunkedRecord(
byte[] serializedKey,
int firstChunkSegmentNumber,
Expand Down Expand Up @@ -200,7 +345,7 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getChunkValueManifes

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getDeleteRecord(
byte[] serializedKey,
int pubSubMessageOffset,
byte[] serializedRmd,
PubSubTopicPartition pubSubTopicPartition) {
KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey);
Expand All @@ -215,7 +360,60 @@ private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getDeleteRecord(

Delete delete = new Delete();
delete.schemaId = 1;
if (serializedRmd != null) {
delete.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd);
delete.replicationMetadataVersionId = 1;
}
messageEnvelope.payloadUnion = delete;
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, pubSubMessageOffset, 0, 20);
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, 20);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getPutRecord(
byte[] serializedKey,
byte[] serializedValue,
byte[] serializedRmd,
PubSubTopicPartition pubSubTopicPartition) {
KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey);
KafkaKey kafkaKey = new KafkaKey(PUT, chunkKeyWithSuffix);
KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope();
messageEnvelope.messageType = PUT.getValue();
messageEnvelope.producerMetadata = new ProducerMetadata();
messageEnvelope.producerMetadata.messageTimestamp = 0;
messageEnvelope.producerMetadata.segmentNumber = 0;
messageEnvelope.producerMetadata.messageSequenceNumber = 0;
messageEnvelope.producerMetadata.producerGUID = new GUID();
Put put = new Put();
put.schemaId = 1;
put.putValue = ByteBuffer.wrap(serializedValue);
if (serializedRmd != null) {
put.replicationMetadataPayload = ByteBuffer.wrap(serializedRmd);
put.replicationMetadataVersionId = 1;
}
messageEnvelope.payloadUnion = put;
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length);
}

private PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> getUpdateRecord(
byte[] serializedKey,
byte[] serializedValue,
PubSubTopicPartition pubSubTopicPartition) {
KeyWithChunkingSuffixSerializer keyWithChunkingSuffixSerializer = new KeyWithChunkingSuffixSerializer();
byte[] chunkKeyWithSuffix = keyWithChunkingSuffixSerializer.serializeNonChunkedKey(serializedKey);
KafkaKey kafkaKey = new KafkaKey(UPDATE, chunkKeyWithSuffix);
KafkaMessageEnvelope messageEnvelope = new KafkaMessageEnvelope();
messageEnvelope.messageType = UPDATE.getValue();
messageEnvelope.producerMetadata = new ProducerMetadata();
messageEnvelope.producerMetadata.messageTimestamp = 0;
messageEnvelope.producerMetadata.segmentNumber = 0;
messageEnvelope.producerMetadata.messageSequenceNumber = 0;
messageEnvelope.producerMetadata.producerGUID = new GUID();
Update update = new Update();
update.schemaId = 1;
update.updateValue = ByteBuffer.wrap(serializedValue);
update.updateSchemaId = 1;

messageEnvelope.payloadUnion = update;
return new ImmutablePubSubMessage<>(kafkaKey, messageEnvelope, pubSubTopicPartition, 1, 0, serializedValue.length);
}
}

0 comments on commit bac9b92

Please sign in to comment.