Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fast-generic-serializer supports 'enum' and 'fixed' types #515

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"type": "record",
"name": "FastSerdeEnums",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests to confirm generic-FastSerializer supports enum types",
"fields": [
{
"name": "enumField",
"type": {
"name": "JustSimpleEnum",
"type": "enum",
"symbols": [
"E1",
"E2",
"E3",
"E4",
"E5"
]
}
},
{
"name": "arrayOfEnums",
"type": [
"null",
{
"type": "array",
"items": "JustSimpleEnum"
}
],
"default": null
},
{
"name": "mapOfEnums",
"type": {
"type": "map",
"values": "JustSimpleEnum"
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"type": "record",
"name": "FastSerdeFixed",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"doc": "Used in tests to confirm generic-FastSerializer supports fixed types",
"fields": [
{
"name": "fixedField",
"type": {
"name": "FixedOfSize10",
"type": "fixed",
"size": 10
}
},
{
"name": "arrayOfFixed",
"type": [
"null",
{
"type": "array",
"items": "FixedOfSize10"
}
],
"default": null
},
{
"name": "mapOfFixed",
"type": {
"type": "map",
"values": "FixedOfSize10"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveFloatList;
import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveIntList;
import com.linkedin.avro.fastserde.coldstart.ColdPrimitiveLongList;
import com.linkedin.avro.fastserde.generated.avro.FastSerdeEnums;
import com.linkedin.avro.fastserde.generated.avro.FastSerdeFixed;
import com.linkedin.avro.fastserde.generated.avro.FixedOfSize10;
import com.linkedin.avro.fastserde.generated.avro.JustSimpleEnum;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import java.io.ByteArrayOutputStream;
import java.io.File;
Expand All @@ -17,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
Expand All @@ -29,6 +35,7 @@
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

import static com.linkedin.avro.fastserde.FastSerdeTestsSupport.*;

Expand Down Expand Up @@ -75,8 +82,8 @@ public void shouldWritePrimitives() {
builder.put("testFlippedIntUnion", null);
builder.put("testString", "aaa");
builder.put("testStringUnion", "aaa");
builder.put("testLong", 1l);
builder.put("testLongUnion", 1l);
builder.put("testLong", 1L);
builder.put("testLongUnion", 1L);
builder.put("testDouble", 1.0);
builder.put("testDoubleUnion", 1.0);
builder.put("testFloat", 1.0f);
Expand All @@ -92,11 +99,11 @@ public void shouldWritePrimitives() {
// then
Assert.assertEquals(1, record.get("testInt"));
Assert.assertEquals(1, record.get("testIntUnion"));
Assert.assertEquals(null, record.get("testFlippedIntUnion"));
Assert.assertNull(record.get("testFlippedIntUnion"));
Assert.assertEquals("aaa", record.get("testString").toString());
Assert.assertEquals("aaa", record.get("testStringUnion").toString());
Assert.assertEquals(1l, record.get("testLong"));
Assert.assertEquals(1l, record.get("testLongUnion"));
Assert.assertEquals(1L, record.get("testLong"));
Assert.assertEquals(1L, record.get("testLongUnion"));
Assert.assertEquals(1.0, record.get("testDouble"));
Assert.assertEquals(1.0, record.get("testDoubleUnion"));
Assert.assertEquals(1.0f, record.get("testFloat"));
Expand All @@ -113,8 +120,9 @@ public GenericData.Fixed newFixed(Schema fixedSchema, byte[] bytes) {
return fixed;
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteFixed() {
public void shouldWriteGenericRecordWithFixed() {
// given
Schema fixedSchema = createFixedSchema("testFixed", 2);
Schema recordSchema = createRecord(
Expand Down Expand Up @@ -142,7 +150,52 @@ public void shouldWriteFixed() {
}

@Test(groups = {"serializationTest"})
public void shouldWriteEnum() {
public void shouldWriteSpecificRecordWithFixed() {
// given
final byte[] bytes1 = "2023-09-07".getBytes();
final byte[] bytes2 = "2023-09-08".getBytes();
final byte[] bytes3 = "2023-09-09".getBytes();

Function<byte[], FixedOfSize10> fixedCreator = bytes -> {
FixedOfSize10 fixedOfSize10 = new FixedOfSize10();
fixedOfSize10.bytes(bytes);
return fixedOfSize10;
};

Map<CharSequence, FixedOfSize10> mapOfFixed = new HashMap<>();
mapOfFixed.put("day1", fixedCreator.apply(bytes1));
mapOfFixed.put("day2", fixedCreator.apply(bytes2));

FastSerdeFixed fastSerdeFixed = new FastSerdeFixed();
setField(fastSerdeFixed, "fixedField", fixedCreator.apply(bytes1));
setField(fastSerdeFixed, "arrayOfFixed", Lists.newArrayList(
fixedCreator.apply(bytes1), fixedCreator.apply(bytes2), fixedCreator.apply(bytes3)));
setField(fastSerdeFixed, "mapOfFixed", mapOfFixed);

// when
GenericRecord record = decodeRecord(fastSerdeFixed.getSchema(), dataAsBinaryDecoder(fastSerdeFixed));

// then
Assert.assertTrue(record.get("fixedField") instanceof GenericData.Fixed);
Assert.assertEquals(((GenericData.Fixed) record.get("fixedField")).bytes(), bytes1);

GenericData.Array<?> arrayOfFixed = (GenericData.Array<?>) record.get("arrayOfFixed");
Assert.assertEquals(arrayOfFixed.size(), 3);
Assert.assertTrue(arrayOfFixed.get(0) instanceof GenericData.Fixed);
Assert.assertEquals(((GenericData.Fixed) arrayOfFixed.get(0)).bytes(), bytes1);
Assert.assertEquals(((GenericData.Fixed) arrayOfFixed.get(1)).bytes(), bytes2);
Assert.assertEquals(((GenericData.Fixed) arrayOfFixed.get(2)).bytes(), bytes3);

@SuppressWarnings("unchecked")
Map<CharSequence, GenericData.Fixed> deserializedMapOfFixed = (Map<CharSequence, GenericData.Fixed>) record.get("mapOfFixed");
Assert.assertEquals(deserializedMapOfFixed.size(), 2);
Assert.assertEquals(deserializedMapOfFixed.get(new Utf8("day1")).bytes(), bytes1);
Assert.assertEquals(deserializedMapOfFixed.get(new Utf8("day2")).bytes(), bytes2);
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteGenericRecordWithEnums() {
// given
Schema enumSchema = createEnumSchema("testEnum", new String[]{"A", "B"});
Schema recordSchema = createRecord(
Expand Down Expand Up @@ -171,6 +224,38 @@ public void shouldWriteEnum() {
Assert.assertEquals("A", ((List<GenericData.EnumSymbol>) record.get("testEnumUnionArray")).get(0).toString());
}

@Test(groups = {"serializationTest"})
public void shouldWriteSpecificRecordWithEnums() {
// given
Map<CharSequence, JustSimpleEnum> mapOfEnums = new HashMap<>();
mapOfEnums.put("due", JustSimpleEnum.E2);
mapOfEnums.put("cinque", JustSimpleEnum.E5);

FastSerdeEnums fastSerdeEnums = new FastSerdeEnums();
setField(fastSerdeEnums, "enumField", JustSimpleEnum.E1);
setField(fastSerdeEnums, "arrayOfEnums", Lists.newArrayList(JustSimpleEnum.E1, JustSimpleEnum.E3, JustSimpleEnum.E4));
setField(fastSerdeEnums, "mapOfEnums", mapOfEnums);

// when
GenericRecord record = decodeRecord(fastSerdeEnums.getSchema(), dataAsBinaryDecoder(fastSerdeEnums));

// then
Assert.assertTrue(record.get("enumField") instanceof GenericData.EnumSymbol);
Assert.assertEquals(record.get("enumField").toString(), "E1");

GenericData.Array<?> arrayOfEnums = (GenericData.Array<?>) record.get("arrayOfEnums");
Assert.assertEquals(arrayOfEnums.size(), 3);
Assert.assertEquals(arrayOfEnums.get(0).toString(), JustSimpleEnum.E1.name());
Assert.assertEquals(arrayOfEnums.get(1).toString(), JustSimpleEnum.E3.name());
Assert.assertEquals(arrayOfEnums.get(2).toString(), JustSimpleEnum.E4.name());

@SuppressWarnings("unchecked")
Map<CharSequence, GenericData.EnumSymbol> deserializedMapOfEnums = (Map<CharSequence, GenericData.EnumSymbol>) record.get("mapOfEnums");
Assert.assertEquals(deserializedMapOfEnums.size(), 2);
Assert.assertEquals(deserializedMapOfEnums.get(new Utf8("due")).toString(), JustSimpleEnum.E2.toString());
Assert.assertEquals(deserializedMapOfEnums.get(new Utf8("cinque")).toString(), JustSimpleEnum.E5.toString());
}

@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordField() {
// given
Expand Down Expand Up @@ -221,6 +306,7 @@ public void shouldWriteRightUnionIndex() {
Assert.assertEquals(unionRecord.getSchema().getName(), "record2");
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordCollectionsField() {
// given
Expand Down Expand Up @@ -253,12 +339,13 @@ public void shouldWriteSubRecordCollectionsField() {
Assert.assertEquals("abc",
((List<GenericData.Record>) record.get("recordsArrayUnion")).get(0).get("subField").toString());
Assert.assertEquals("abc",
((Map<String, GenericData.Record>) record.get("recordsMap")).get(new Utf8("1")).get("subField").toString());
Assert.assertEquals("abc", ((Map<String, GenericData.Record>) record.get("recordsMapUnion")).get(new Utf8("1"))
((Map<CharSequence, GenericData.Record>) record.get("recordsMap")).get(new Utf8("1")).get("subField").toString());
Assert.assertEquals("abc", ((Map<CharSequence, GenericData.Record>) record.get("recordsMapUnion")).get(new Utf8("1"))
.get("subField")
.toString());
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordComplexCollectionsField() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static TestRecord emptyTestRecord() {
setField(record, "recordsMapArray", Collections.emptyMap());

setField(record, "testInt", 1);
setField(record, "testLong", 1l);
setField(record, "testLong", 1L);
setField(record, "testDouble", 1.0);
setField(record, "testFloat", 1.0f);
setField(record, "testBoolean", true);
Expand Down Expand Up @@ -87,8 +87,8 @@ public void shouldWritePrimitives() {
setField(record, "testIntUnion", 1);
setField(record, "testString", "aaa");
setField(record, "testStringUnion", "aaa");
setField(record, "testLong", 1l);
setField(record, "testLongUnion", 1l);
setField(record, "testLong", 1L);
setField(record, "testLongUnion", 1L);
setField(record, "testDouble", 1.0);
setField(record, "testDoubleUnion", 1.0);
setField(record, "testFloat", 1.0f);
Expand All @@ -106,18 +106,19 @@ record = decodeRecordFast(TestRecord.SCHEMA$, dataAsDecoder(record));
Assert.assertEquals(1, ((Integer) getField(record, "testIntUnion")).intValue());
Assert.assertEquals("aaa", getField(record, "testString").toString());
Assert.assertEquals("aaa", getField(record, "testStringUnion").toString());
Assert.assertEquals(1l, getField(record, "testLong"));
Assert.assertEquals(1l, ((Long) getField(record, "testLongUnion")).longValue());
Assert.assertEquals(1L, getField(record, "testLong"));
Assert.assertEquals(1L, ((Long) getField(record, "testLongUnion")).longValue());
Assert.assertEquals(1.0, getField(record, "testDouble"));
Assert.assertEquals(1.0, getField(record, "testDoubleUnion"));
Assert.assertEquals(1.0f, getField(record, "testFloat"));
Assert.assertEquals(1.0f, getField(record, "testFloatUnion"));
Assert.assertEquals(true, getField(record, "testBoolean"));
Assert.assertEquals(true, ((Boolean) getField(record, "testBooleanUnion")).booleanValue());
Assert.assertTrue((Boolean) getField(record, "testBoolean"));
Assert.assertTrue((Boolean) getField(record, "testBooleanUnion"));
Assert.assertEquals(ByteBuffer.wrap(new byte[]{0x01, 0x02}), getField(record, "testBytes"));
Assert.assertEquals(ByteBuffer.wrap(new byte[]{0x01, 0x02}), getField(record, "testBytesUnion"));
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteFixed() {
// given
Expand Down Expand Up @@ -147,6 +148,7 @@ record = decodeRecordFast(TestRecord.SCHEMA$, dataAsDecoder(record));
Assert.assertEquals(new byte[]{0x04}, ((List<TestFixed>) getField(record, "testFixedUnionArray")).get(0).bytes());
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteEnum() {
// given
Expand Down Expand Up @@ -185,6 +187,7 @@ record = decodeRecordFast(TestRecord.SCHEMA$, dataAsDecoder(record));
Assert.assertEquals("abc", getField((SubRecord) getField(record, "subRecord"), "subField").toString());
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordCollectionsField() {

Expand Down Expand Up @@ -212,6 +215,7 @@ record = decodeRecordFast(TestRecord.SCHEMA$, dataAsDecoder(record));
Assert.assertEquals("abc", getField(((Map<CharSequence, SubRecord>) getField(record, "recordsMapUnion")).get(new Utf8("1")), "subField").toString());
}

@SuppressWarnings("unchecked")
@Test(groups = {"serializationTest"})
public void shouldWriteSubRecordComplexCollectionsField() {
// given
Expand Down Expand Up @@ -331,7 +335,7 @@ public void shouldWriteMapOfRecords() {
recordsMap.put("2", testRecord);

// when
Map<String, TestRecord> map = decodeRecordFast(mapRecordSchema, dataAsDecoder(recordsMap, mapRecordSchema));
Map<CharSequence, TestRecord> map = decodeRecordFast(mapRecordSchema, dataAsDecoder(recordsMap, mapRecordSchema));

// then
Assert.assertEquals(2, map.size());
Expand Down Expand Up @@ -378,7 +382,6 @@ public <T> Decoder dataAsDecoder(T data, Schema schema) {
return DecoderFactory.defaultFactory().createBinaryDecoder(baos.toByteArray(), null);
}

@SuppressWarnings("unchecked")
private <T> T decodeRecordFast(Schema writerSchema, Decoder decoder) {
SpecificDatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ private JExpression parseDefaultValue(Schema schema, Object defaultValue, JBlock
fixedBytesArray.add(JExpr.lit(b));
}
// For specific Fixed type, Avro-1.4 will only generate the class with default constructor without params
JClass fixedClass = schemaAssistant.classFromSchema(schema);
JClass fixedClass = schemaAssistant.classFromSchema(schema, false, false, false);
if (useGenericTypes) {
JInvocation newFixedExpr = null;
if (Utils.isAvro14()) {
Expand Down Expand Up @@ -1018,7 +1018,7 @@ private void processFixed(final Schema schema, JBlock body, FieldAction action,
if (reuseSupplier.get().equals(JExpr._null())) {
body.assign(fixedBuffer, JExpr.direct(" new byte[" + schema.getFixedSize() + "]"));
} else {
/**
/*
* Here will check whether the length of the reused fixed is same as the one to be deserialized or not.
* If not, here will initialize a new byte array to store it.
*/
Expand All @@ -1035,7 +1035,7 @@ private void processFixed(final Schema schema, JBlock body, FieldAction action,
}
body.directStatement(DECODER + ".readFixed(" + fixedBuffer.name() + ");");

JClass fixedClass = schemaAssistant.classFromSchema(schema);
JClass fixedClass = schemaAssistant.classFromSchema(schema, false, false, false);
if (useGenericTypes) {
JInvocation newFixedExpr;
if (Utils.isAvro14()) {
Expand Down
Loading
Loading