Skip to content

Commit

Permalink
6 more reader/writer implementation classes are verified in unit tests:
Browse files Browse the repository at this point in the history
- ColdGenericDatumReader
- ColdSpecificDatumReader
- FastDeserializerWithAvroGenericImpl
- FastDeserializerWithAvroSpecificImpl
- FastSerializerWithAvroGenericImpl
- FastSerializerWithAvroSpecificImpl
  • Loading branch information
krisso-rtb committed Sep 20, 2023
1 parent 11e46a6 commit 5da126b
Showing 1 changed file with 58 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.ColdGenericDatumReader;
import org.apache.avro.generic.ColdSpecificDatumReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
Expand All @@ -39,6 +41,7 @@
import com.linkedin.avro.fastserde.FastDeserializer;
import com.linkedin.avro.fastserde.FastGenericDeserializerGenerator;
import com.linkedin.avro.fastserde.FastGenericSerializerGenerator;
import com.linkedin.avro.fastserde.FastSerdeCache;
import com.linkedin.avro.fastserde.FastSerializer;
import com.linkedin.avro.fastserde.FastSpecificDeserializerGenerator;
import com.linkedin.avro.fastserde.FastSpecificSerializerGenerator;
Expand Down Expand Up @@ -122,32 +125,45 @@ protected <T extends SpecificRecordBase> byte[] verifySerializers(T data,
FunctionThrowingIOException<T, ByteBuffer> toByteBuffer) throws IOException {
// given
Schema schema = data.getSchema();
GenericData genericData = copyConversions(data.getSpecificData(), new GenericData());
SpecificData specificData = copyConversions(data.getSpecificData(), new SpecificData());

FastSerializer<T> fastGenericSerializer = new FastGenericSerializerGenerator<T>(
schema, classesDir, classLoader, null, copyConversions(data.getSpecificData(), new GenericData()))
schema, classesDir, classLoader, null, genericData)
.generateSerializer();

FastSerializer<T> fastSpecificSerializer = new FastSpecificSerializerGenerator<T>(
schema, classesDir, classLoader, null, copyConversions(data.getSpecificData(), new SpecificData()))
schema, classesDir, classLoader, null, specificData)
.generateSerializer();

FastSerdeCache.FastSerializerWithAvroGenericImpl<T> fastSerializerWithAvroGeneric =
new FastSerdeCache.FastSerializerWithAvroGenericImpl<>(schema, genericData);

FastSerdeCache.FastSerializerWithAvroSpecificImpl<T> fastSerializerWithAvroSpecific =
new FastSerdeCache.FastSerializerWithAvroSpecificImpl<>(schema, specificData);

GenericDatumWriter<T> genericDatumWriter = new GenericDatumWriter<>(
schema, copyConversions(data.getSpecificData(), new GenericData()));
schema, genericData);

SpecificDatumWriter<T> specificDatumWriter = new SpecificDatumWriter<>(
schema, copyConversions(data.getSpecificData(), new SpecificData()));
schema, specificData);

fixConversionsIfAvro19(data.getSpecificData());

// when
byte[] fastGenericBytes = serialize(fastGenericSerializer, data);
byte[] fastSpecificBytes = serialize(fastSpecificSerializer, data);
byte[] fastGenericWithAvroBytes = serialize(fastSerializerWithAvroGeneric, data);
byte[] fastSpecificWithAvroBytes = serialize(fastSerializerWithAvroSpecific, data);
byte[] genericBytes = serialize(genericDatumWriter, data);
byte[] specificBytes = serialize(specificDatumWriter, data);
byte[] defaultBytes = toByteBuffer.apply(data).array();

// then all 5 serializing methods should return the same array of bytes
// then all 7 serializing methods should return the same array of bytes
Assert.assertEquals(fastGenericBytes, defaultBytes);
Assert.assertEquals(fastSpecificBytes, defaultBytes);
Assert.assertEquals(fastGenericWithAvroBytes, defaultBytes);
Assert.assertEquals(fastSpecificWithAvroBytes, defaultBytes);
Assert.assertEquals(genericBytes, defaultBytes);
Assert.assertEquals(specificBytes, defaultBytes);

Expand All @@ -160,35 +176,57 @@ protected <T extends SpecificRecordBase> T verifyDeserializers(byte[] bytesWithH
T data = fromByteBuffer.apply(ByteBuffer.wrap(bytesWithHeader));
byte[] bytes = dropV1Header(bytesWithHeader);
Schema schema = data.getSchema();
GenericData genericData = copyConversions(data.getSpecificData(), new GenericData());
SpecificData specificData = copyConversions(data.getSpecificData(), new SpecificData());
Supplier<Decoder> decoderSupplier = () -> DecoderFactory.get().binaryDecoder(bytes, null);

FastDeserializer<GenericData.Record> fastGenericDeserializer = new FastGenericDeserializerGenerator<GenericData.Record>(
schema, schema, classesDir, classLoader, null, copyConversions(data.getSpecificData(), new GenericData()))
schema, schema, classesDir, classLoader, null, genericData)
.generateDeserializer();

FastDeserializer<T> fastSpecificDeserializer = new FastSpecificDeserializerGenerator<T>(
schema, schema, classesDir, classLoader, null, copyConversions(data.getSpecificData(), new SpecificData()))
schema, schema, classesDir, classLoader, null, specificData)
.generateDeserializer();

GenericDatumReader<GenericData.Record> genericDatumReader = new GenericDatumReader<>(
schema, schema, copyConversions(data.getSpecificData(), new GenericData()));
FastSerdeCache.FastDeserializerWithAvroGenericImpl<GenericData.Record> fastDeserializerWithAvroGeneric =
new FastSerdeCache.FastDeserializerWithAvroGenericImpl<>(schema, schema, genericData);

FastSerdeCache.FastDeserializerWithAvroSpecificImpl<T> fastDeserializerWithAvroSpecific =
new FastSerdeCache.FastDeserializerWithAvroSpecificImpl<>(schema, schema, specificData);

GenericDatumReader<GenericData.Record> genericDatumReader = new GenericDatumReader<>(schema, schema, genericData);

SpecificDatumReader<T> specificDatumReader = new SpecificDatumReader<>(schema, schema, specificData);

SpecificDatumReader<T> specificDatumReader = new SpecificDatumReader<>(
schema, schema, copyConversions(data.getSpecificData(), new SpecificData()));
ColdGenericDatumReader<GenericData.Record> coldGenericDatumReader = ColdGenericDatumReader.of(schema, schema, genericData);

ColdSpecificDatumReader<T> coldSpecificDatumReader = ColdSpecificDatumReader.of(schema, schema, specificData);

// when deserializing with different serializers/writers
GenericData.Record deserializedFastGeneric = fastGenericDeserializer.deserialize(decoderSupplier.get());
T deserializedFastSpecific = fastSpecificDeserializer.deserialize(decoderSupplier.get());
GenericData.Record deserializedGenericReader = genericDatumReader.read(null, decoderSupplier.get());
T deserializedSpecificReader = specificDatumReader.read(null, decoderSupplier.get());
GenericData.Record deserializedWithFastGeneric = fastGenericDeserializer.deserialize(decoderSupplier.get());
T deserializedWithFastSpecific = fastSpecificDeserializer.deserialize(decoderSupplier.get());

GenericData.Record deserializedWithFastWithAvroGeneric = fastDeserializerWithAvroGeneric.deserialize(decoderSupplier.get());
T deserializedWithFastWithAvroSpecific = fastDeserializerWithAvroSpecific.deserialize(decoderSupplier.get());

GenericData.Record deserializedWithGenericReader = genericDatumReader.read(null, decoderSupplier.get());
T deserializedWithSpecificReader = specificDatumReader.read(null, decoderSupplier.get());

GenericData.Record deserializedWithColdGenericReader = coldGenericDatumReader.read(null, decoderSupplier.get());
T deserializedWithColdSpecificReader = coldSpecificDatumReader.read(null, decoderSupplier.get());

// then
Assert.assertEquals(deserializedFastSpecific, data);
Assert.assertEquals(deserializedSpecificReader, data);
assertEquals(deserializedFastGeneric, data);
assertEquals(deserializedGenericReader, data);
Assert.assertEquals(deserializedWithFastSpecific, data);
Assert.assertEquals(deserializedWithFastWithAvroSpecific, data);
Assert.assertEquals(deserializedWithSpecificReader, data);
Assert.assertEquals(deserializedWithColdSpecificReader, data);

assertEquals(deserializedWithFastGeneric, data);
assertEquals(deserializedWithFastWithAvroGeneric, data);
assertEquals(deserializedWithGenericReader, data);
assertEquals(deserializedWithColdGenericReader, data);

return deserializedFastSpecific;
return deserializedWithFastSpecific;
}

protected <T extends SpecificRecordBase> void assertEquals(GenericData.Record actual, T expected) throws IOException {
Expand Down

0 comments on commit 5da126b

Please sign in to comment.