Skip to content

Commit

Permalink
Fast Serializer support for Avro-1.4 (#17)
Browse files Browse the repository at this point in the history
* Fast Serializer support for Avro-1.4

This code change adds support for fast serializer of Avro-1.4.
Now, fast serializer/de-serializer are available for
Avro 1.4/1.7/1.8.

This special logic for Avro-1.4 in Fast Serializer generation
is for enum/fixed since those two types in Avro-1.4 are implementing
differently from other versions, and the corresponding Schema couldn't
be extracted from the generic implementations (GenericData.EnumSymbol
and GenericData.Fixed).

* Added comment in `SchemaAssistant#isNamedType` method
  • Loading branch information
gaojieliu authored Jan 30, 2020
1 parent 64b723f commit c95786f
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 69 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ LinkedIn.

| Version | Serialization | Deserialization | Fast Serialization | Fast Deserialization |
| -------- | ------------- | --------------- | ------------------ | -------------------- |
| Avro 1.4 | Yes | Yes | No | Yes |
| Avro 1.4 | Yes | Yes | Yes | Yes |
| Avro 1.5 | ??? | ??? | No | No |
| Avro 1.6 | ??? | ??? | No | No |
| Avro 1.7 | Yes | Yes | Yes | Yes |
Expand Down
7 changes: 2 additions & 5 deletions avro-fastserde/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,7 @@ for (String avroVersion : avroVersions) {
task "testAvro${avroVersion}" (type: Test, group: "Verification", description: "runs unit tests with avro ${avroVersion}") {

useTestNG() {
includeGroups "deserializationTest"
if (!avroVersion.equals("14")) {
includeGroups "serializationTest"
}
excludeGroups "perfTest"
}

testLogging {
Expand Down Expand Up @@ -147,4 +144,4 @@ cleanupAndRebuildTestsForAvro18.dependsOn generateAvroClasses18

testAvro14.dependsOn cleanupAndRebuildTestsForAvro14
testAvro17.dependsOn cleanupAndRebuildTestsForAvro17
testAvro18.dependsOn cleanupAndRebuildTestsForAvro18
testAvro18.dependsOn cleanupAndRebuildTestsForAvro18
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JFieldVar;
import com.sun.codemodel.JForEach;
import com.sun.codemodel.JForLoop;
import com.sun.codemodel.JMethod;
Expand All @@ -15,8 +16,11 @@
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.avro.Schema;
import org.apache.avro.io.Encoder;
import org.apache.avro.util.Utf8;
Expand All @@ -31,11 +35,21 @@ public class FastSerializerGenerator<T> extends FastSerializerGeneratorBase<T> {
private final Map<String, JMethod> serializeMethodMap = new HashMap<>();
private final SchemaAssistant schemaAssistant;

/**
* Enum schema mapping for Avro-1.4.
*/
private JFieldVar enumSchemaMapField;
/**
* This field is used to decide whether the corresponding schema is already in {@link #enumSchemaMapField} or not.
*/
private final Set<Long> enumSchemaIdSet = new HashSet<>();


public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
String compileClassPath) {
super(schema, destination, classLoader, compileClassPath);
this.useGenericTypes = useGenericTypes;
this.schemaAssistant = new SchemaAssistant(codeModel, useGenericTypes);
this.schemaAssistant = new SchemaAssistantForSerializer(codeModel, useGenericTypes);
}

@Override
Expand All @@ -46,10 +60,24 @@ public FastSerializer<T> generateSerializer() {
try {
serializerClass = classPackage._class(className);

if (Utils.isAvro14()) {
/**
* In Avro-1.4, there is no way to infer/extract enum schema from {@link org.apache.avro.generic.GenericData.EnumSymbol}, so
* the serializer needs to maintain a mapping between the schema id and the actual {@link org.apache.avro.Schema.EnumSchema},
* and get the enum id from the corresponding EnumSchema in {@link #processEnum(Schema, JExpression, JBlock)}.
*/
enumSchemaMapField =
serializerClass.field(
JMod.PRIVATE,
codeModel.ref(Map.class).narrow(Long.class).narrow(Schema.class),
"enumSchemaMap",
JExpr._new(codeModel.ref(ConcurrentHashMap.class).narrow(Long.class).narrow(Schema.class)));
}

final JMethod serializeMethod = serializerClass.method(JMod.PUBLIC, void.class, "serialize");
final JVar serializeMethodParam;

JClass outputClass = classFromSchemaForSerializer(schema);
JClass outputClass = schemaAssistant.classFromSchema(schema);
serializerClass._implements(codeModel.ref(FastSerializer.class).narrow(outputClass));
serializeMethodParam = serializeMethod.param(outputClass, "data");

Expand Down Expand Up @@ -126,7 +154,7 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi
for (Schema.Field field : recordSchema.getFields()) {
Schema fieldSchema = field.schema();
if (SchemaAssistant.isComplexType(fieldSchema)) {
JClass fieldClass = classFromSchemaForSerializer(fieldSchema);
JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema);
JVar containerVar = declareValueVar(field.name(), fieldSchema, body);
JExpression valueExpression = JExpr.invoke(recordExpr, "get").arg(JExpr.lit(field.pos()));
containerVar.init(JExpr.cast(fieldClass, valueExpression));
Expand All @@ -138,37 +166,8 @@ private void processRecord(final Schema recordSchema, JExpression recordExpr, fi
}
}

/**
* Special handling for "String" type since the underlying data could be "String" or "Utf8".
*
* This is different from the de-serializer since Avro will always decode it into "Utf8".
* @param fieldClass
* @return
*/
private JClass specialHandlingOfStringSchemaForSerializer(JClass fieldClass) {
if (fieldClass.equals(codeModel.ref(Utf8.class))) {
return codeModel.ref(CharSequence.class);
}
return fieldClass;
}

private JClass classFromSchemaForSerializer(Schema fieldSchema) {
JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema);
return specialHandlingOfStringSchemaForSerializer(fieldClass);
}

private JClass classFromSchemaForSerializer(Schema fieldSchema, boolean abstractType) {
JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema, abstractType);
return specialHandlingOfStringSchemaForSerializer(fieldClass);
}

private JClass classFromSchemaForSerializer(Schema fieldSchema, boolean abstractType, boolean rawType) {
JClass fieldClass = schemaAssistant.classFromSchema(fieldSchema, abstractType, rawType);
return specialHandlingOfStringSchemaForSerializer(fieldClass);
}

private void processArray(final Schema arraySchema, JExpression arrayExpr, JBlock body) {
final JClass arrayClass = classFromSchemaForSerializer(arraySchema);
final JClass arrayClass = schemaAssistant.classFromSchema(arraySchema);
body.invoke(JExpr.direct(ENCODER), "writeArrayStart");

final JExpression emptyArrayCondition = arrayExpr.eq(JExpr._null()).cor(JExpr.invoke(arrayExpr, "isEmpty"));
Expand Down Expand Up @@ -199,9 +198,9 @@ private void processArray(final Schema arraySchema, JExpression arrayExpr, JBloc

private void processMap(final Schema mapSchema, JExpression mapExpr, JBlock body) {

final JClass mapClass = classFromSchemaForSerializer(mapSchema);
final JClass mapClass = schemaAssistant.classFromSchema(mapSchema);

JClass keyClass = specialHandlingOfStringSchemaForSerializer(schemaAssistant.keyClassFromMapSchema(mapSchema));
JClass keyClass = schemaAssistant.keyClassFromMapSchema(mapSchema);

body.invoke(JExpr.direct(ENCODER), "writeMapStart");

Expand Down Expand Up @@ -280,8 +279,8 @@ private void processUnion(final Schema unionSchema, JExpression unionExpr, JBloc
continue;
}

JClass optionClass = classFromSchemaForSerializer(schemaOption);
JClass rawOptionClass = classFromSchemaForSerializer(schemaOption, true, true);
JClass optionClass = schemaAssistant.classFromSchema(schemaOption);
JClass rawOptionClass = schemaAssistant.classFromSchema(schemaOption, true, true);
JExpression condition = unionExpr._instanceof(rawOptionClass);
if (useGenericTypes && SchemaAssistant.isNamedType(schemaOption)) {
condition = condition.cand(JExpr.invoke(JExpr.lit(schemaOption.getFullName()), "equals")
Expand All @@ -304,18 +303,41 @@ private void processUnion(final Schema unionSchema, JExpression unionExpr, JBloc
}

private void processFixed(Schema fixedSchema, JExpression fixedValueExpression, JBlock body) {
JClass fixedClass = classFromSchemaForSerializer(fixedSchema);
JClass fixedClass = schemaAssistant.classFromSchema(fixedSchema);
body.invoke(JExpr.direct(ENCODER), "writeFixed")
.arg(JExpr.invoke(JExpr.cast(fixedClass, fixedValueExpression), "bytes"));
}

private void processEnum(Schema enumSchema, JExpression enumValueExpression, JBlock body) {
JClass enumClass = classFromSchemaForSerializer(enumSchema);
JClass enumClass = schemaAssistant.classFromSchema(enumSchema);
JExpression enumValueCasted = JExpr.cast(enumClass, enumValueExpression);
JExpression valueToWrite;
if (useGenericTypes) {
valueToWrite =
JExpr.invoke(enumValueCasted.invoke("getSchema"), "getEnumOrdinal").arg(enumValueCasted.invoke("toString"));
if (Utils.isAvro14()) {
/**
* Register/retrieve the corresponding {@link org.apache.avro.Schema.EnumSchema} from the mapping.
*/
long enumSchemaFingerprint = Utils.getSchemaFingerprint(enumSchema);
if (enumSchemaIdSet.contains(enumSchemaFingerprint)) {
valueToWrite = JExpr.invoke(
enumSchemaMapField.invoke("get").arg(JExpr.lit(enumSchemaFingerprint)),
"getEnumOrdinal"
).arg(enumValueCasted.invoke("toString"));
} else {
enumSchemaIdSet.add(enumSchemaFingerprint);
JVar enumSchemaVar = body.decl(codeModel.ref(Schema.class),
getVariableName(enumSchema.getName() + "EnumSchema"),
codeModel.ref(Schema.class).staticInvoke("parse").arg(enumSchema.toString())
);
body.invoke(enumSchemaMapField, "put").arg(JExpr.lit(enumSchemaFingerprint)).arg(enumSchemaVar);
valueToWrite = JExpr.invoke(enumSchemaVar, "getEnumOrdinal").arg(enumValueCasted.invoke("toString"));
}
} else {
valueToWrite = JExpr.invoke(
enumValueCasted.invoke("getSchema"),
"getEnumOrdinal"
).arg(enumValueCasted.invoke("toString"));
}
} else {
valueToWrite = enumValueCasted.invoke("ordinal");
}
Expand All @@ -339,7 +361,7 @@ private void processString(final Schema primitiveSchema, JExpression primitiveVa

private void processPrimitive(final Schema primitiveSchema, JExpression primitiveValueExpression, JBlock body) {
String writeFunction;
JClass primitiveClass = classFromSchemaForSerializer(primitiveSchema);
JClass primitiveClass = schemaAssistant.classFromSchema(primitiveSchema);
JExpression castedValue = JExpr.cast(primitiveClass, primitiveValueExpression);
switch (primitiveSchema.getType()) {
case STRING:
Expand Down Expand Up @@ -373,7 +395,7 @@ private void processPrimitive(final Schema primitiveSchema, JExpression primitiv

private JVar declareValueVar(final String name, final Schema schema, JBlock block) {
if (SchemaAssistant.isComplexType(schema)) {
return block.decl(classFromSchemaForSerializer(schema, true), getVariableName(StringUtils.uncapitalize(name)),
return block.decl(schemaAssistant.classFromSchema(schema, true), getVariableName(StringUtils.uncapitalize(name)),
JExpr._null());
} else {
throw new FastDeserializerGeneratorException("Incorrect container variable: " + schema.getType()); //.getName());
Expand All @@ -400,7 +422,7 @@ private JMethod createMethod(final Schema schema) {
JMethod method =
serializerClass.method(JMod.PUBLIC, codeModel.VOID, "serialize" + schema.getName() + nextUniqueInt());
method._throws(IOException.class);
method.param(classFromSchemaForSerializer(schema), "data");
method.param(schemaAssistant.classFromSchema(schema), "data");
method.param(Encoder.class, ENCODER);

method.annotate(SuppressWarnings.class).param("value", "unchecked");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public SchemaAssistant(JCodeModel codeModel, boolean useGenericTypes) {
this.exceptionsFromStringable = new HashSet<>();
}

protected JCodeModel getCodeModel() {
return codeModel;
}

protected Set<String> getUsedFullyQualifiedClassNameSet() {
return fullyQualifiedClassNameSet;
}
Expand All @@ -67,9 +71,13 @@ public static boolean isComplexType(Schema schema) {
public static boolean isNamedType(Schema schema) {
switch (schema.getType()) {
case RECORD:
return true;
case ENUM:
case FIXED:
return true;
/**
* This is used to avoid `getSchema` call since in Avro-1.4, `getSchema` method is not available for Enum and Fixed.
*/
return Utils.isAvro14() ? false : true;
default:
return false;
}
Expand Down Expand Up @@ -129,7 +137,7 @@ public JClass keyClassFromMapSchema(Schema schema) {
extendExceptionsFromStringable(schema.getProp(KEY_CLASS_PROP));
return codeModel.ref(schema.getProp(KEY_CLASS_PROP));
} else {
return codeModel.ref(Utf8.class);
return defaultStringType();
}
}

Expand Down Expand Up @@ -249,7 +257,7 @@ public JClass classFromSchema(Schema schema, boolean abstractType, boolean rawTy
outputClass = codeModel.ref(schema.getProp(CLASS_PROP));
extendExceptionsFromStringable(schema.getProp(CLASS_PROP));
} else {
outputClass = codeModel.ref(Utf8.class);
outputClass = defaultStringType();
}
break;
case BYTES:
Expand All @@ -270,6 +278,10 @@ public JClass classFromSchema(Schema schema, boolean abstractType, boolean rawTy
return outputClass;
}

protected JClass defaultStringType() {
return codeModel.ref(Utf8.class);
}

public JExpression getEnumValueByName(Schema enumSchema, JExpression nameExpr, JInvocation getSchemaExpr) {
if (useGenericTypes) {
if (Utils.isAvro14()) {
Expand Down Expand Up @@ -309,7 +321,7 @@ public JExpression getStringableValue(Schema schema, JExpression stringExpr) {
if (isStringable(schema)) {
return JExpr._new(classFromSchema(schema)).arg(stringExpr);
} else {
return JExpr._new(codeModel.ref(Utf8.class)).arg(stringExpr);
return JExpr._new(defaultStringType()).arg(stringExpr);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.linkedin.avro.fastserde;

import com.sun.codemodel.JClass;
import com.sun.codemodel.JCodeModel;


public class SchemaAssistantForSerializer extends SchemaAssistant {
public SchemaAssistantForSerializer(JCodeModel codeModel, boolean useGenericTypes) {
super(codeModel, useGenericTypes);
}

/**
* Special handling for "String" type since the underlying data could be "String" or "Utf8".
*
* This is different from the de-serializer since Avro will always decode it into "Utf8".
* @return
*/
@Override
protected JClass defaultStringType() {
return getCodeModel().ref(CharSequence.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static Long getSchemaFingerprint(Schema schema) {
AVRO_VERSIONS_SUPPORTED_FOR_DESERIALIZER.add(AvroVersion.AVRO_1_7);
AVRO_VERSIONS_SUPPORTED_FOR_DESERIALIZER.add(AvroVersion.AVRO_1_8);

AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_4);
AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_7);
AVRO_VERSIONS_SUPPORTED_FOR_SERIALIZER.add(AvroVersion.AVRO_1_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,16 +108,31 @@ public void serializeStringableFields() throws URISyntaxException, MalformedURLE
specificDataFromDecoder(StringableRecord.SCHEMA$, writeWithFastAvro(record, StringableRecord.SCHEMA$));

// then
Assert.assertEquals(exampleBigDecimal, afterDecoding.bigdecimal);
Assert.assertEquals(exampleBigInteger, afterDecoding.biginteger);
Assert.assertEquals(exampleFile, afterDecoding.file);
Assert.assertEquals(Collections.singletonList(exampleURL), afterDecoding.urlArray);
Assert.assertEquals(Collections.singletonMap(exampleURL, exampleBigInteger), afterDecoding.urlMap);
Assert.assertNotNull(afterDecoding.subRecord);
Assert.assertEquals(exampleURI, afterDecoding.subRecord.uriField);
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord);
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord);
Assert.assertEquals(exampleURI, afterDecoding.subRecordWithSubRecord.subRecord.uriField);
if (Utils.isAvro14()) {
Assert.assertEquals(exampleBigDecimal.toString(), afterDecoding.bigdecimal.toString());
Assert.assertEquals(exampleBigInteger.toString(), afterDecoding.biginteger.toString());
Assert.assertEquals(exampleFile.toString(), afterDecoding.file.toString());
Assert.assertEquals(Collections.singletonList(new Utf8(exampleURL.toString())), afterDecoding.urlArray);
Assert.assertEquals(
Collections.singletonMap(new Utf8(exampleURL.toString()), new Utf8(exampleBigInteger.toString())),
afterDecoding.urlMap);
Assert.assertNotNull(afterDecoding.subRecord);
Assert.assertEquals(exampleURI.toString(), afterDecoding.subRecord.uriField.toString());
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord);
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord);
Assert.assertEquals(exampleURI.toString(), afterDecoding.subRecordWithSubRecord.subRecord.uriField.toString());
} else {
Assert.assertEquals(exampleBigDecimal, afterDecoding.bigdecimal);
Assert.assertEquals(exampleBigInteger, afterDecoding.biginteger);
Assert.assertEquals(exampleFile, afterDecoding.file);
Assert.assertEquals(Collections.singletonList(exampleURL), afterDecoding.urlArray);
Assert.assertEquals(Collections.singletonMap(exampleURL, exampleBigInteger), afterDecoding.urlMap);
Assert.assertNotNull(afterDecoding.subRecord);
Assert.assertEquals(exampleURI, afterDecoding.subRecord.uriField);
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord);
Assert.assertNotNull(afterDecoding.subRecordWithSubRecord.subRecord);
Assert.assertEquals(exampleURI, afterDecoding.subRecordWithSubRecord.subRecord.uriField);
}
}
}

Expand Down
Loading

0 comments on commit c95786f

Please sign in to comment.