Skip to content

Commit

Permalink
Tweaked the fast-avro schema fingerprinting logic
Browse files Browse the repository at this point in the history
It used to leverage the Avro fingerprinting, which is based on the
"parsing canonical form" of the schema, but this ignores properties
such as which class to use for String deserialization. This is a
problem in cases where we wish to deserialize the same schema both
with and without Java Strings, since the cache will return the same
deserializer for both, ultimately leading to class cast issues.

The new approach is to simply take the hash code of the full string
of the schema.
  • Loading branch information
FelixGV committed Aug 17, 2023
1 parent 2019012 commit d6ab13f
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,34 +224,48 @@ public void javaStringPropertyInReaderSchemaTest(Boolean whetherUseFastDeseriali
stringMap.put("ddd", "eee");
record.put("testStringMap", stringMap);

Decoder decoder = writeWithFastAvro(record, writerSchema, false);
Decoder decoder1 = writeWithFastAvro(record, writerSchema, false);
Decoder decoder2 = writeWithFastAvro(record, writerSchema, false);

GenericRecord afterDecoding;
GenericRecord afterDecodingWithJavaString, afterDecodingWithoutJavaString;
if (whetherUseFastDeserializer) {
afterDecoding = readWithFastAvro(writerSchema, readerSchema, decoder, false);
afterDecodingWithJavaString = readWithFastAvro(writerSchema, readerSchema, decoder1, false);
afterDecodingWithoutJavaString = readWithFastAvro(writerSchema, writerSchema, decoder2, false);
} else {
afterDecoding = readWithSlowAvro(writerSchema, readerSchema, decoder, false);
afterDecodingWithJavaString = readWithSlowAvro(writerSchema, readerSchema, decoder1, false);
afterDecodingWithoutJavaString = readWithSlowAvro(writerSchema, writerSchema, decoder2, false);
}

if (Utils.isAbleToSupportJavaStrings()){
Assert.assertTrue(afterDecoding.get(0) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(afterDecoding.get(1) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecoding.get(2)).get(0) instanceof String,
"String is expected, but got: " + ((GenericData.Array<Object>) afterDecoding.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next().getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0) instanceof String,
"String is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass());
} else {
Assert.assertTrue(afterDecoding.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(afterDecoding.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecoding.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecoding.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next().getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass());
}

// Regardless of the above, we should also be able to decode to Utf8 within the same JVM that already decoded Java Strings
Assert.assertTrue(afterDecodingWithoutJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithoutJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithoutJavaString.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithoutJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).values().iterator().next().getClass());

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public class FastDeserializerGenerator<T> extends FastDeserializerGeneratorBase<
private static final Supplier<JExpression> EMPTY_SUPPLIER = () -> JExpr._null();

private JMethod constructor;
private Map<Long, Schema> schemaMap = new HashMap<>();
private Map<Long, JVar> schemaVarMap = new HashMap<>();
private Map<Integer, Schema> schemaMap = new HashMap<>();
private Map<Integer, JVar> schemaVarMap = new HashMap<>();
private Map<String, JMethod> deserializeMethodMap = new HashMap<>();
private Map<String, JMethod> skipMethodMap = new HashMap<>();
private Map<JMethod, Set<Class<? extends Exception>>> exceptionFromMethodMap = new HashMap<>();
Expand Down Expand Up @@ -1246,11 +1246,12 @@ private JVar declareSchemaVar(Schema valueSchema, String variableName, JInvocati
*/
if (SchemaAssistant.isComplexType(valueSchema) || Schema.Type.ENUM.equals(valueSchema.getType())
|| Schema.Type.FIXED.equals(valueSchema.getType())) {
long schemaId = Utils.getSchemaFingerprint(valueSchema);
if (schemaVarMap.get(schemaId) != null) {
return schemaVarMap.get(schemaId);
int schemaId = Utils.getSchemaFingerprint(valueSchema);
JVar schemaVar = schemaVarMap.get(schemaId);
if (schemaVar != null) {
return schemaVar;
} else {
JVar schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class,
schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class,
getUniqueName(StringUtils.uncapitalize(variableName)));
constructor.body().assign(JExpr.refthis(schemaVar.name()), getValueType);

Expand All @@ -1266,7 +1267,7 @@ private void registerSchema(final Schema writerSchema, JVar schemaVar) {
registerSchema(writerSchema, Utils.getSchemaFingerprint(writerSchema), schemaVar);
}

private void registerSchema(final Schema writerSchema, long schemaId, JVar schemaVar) {
private void registerSchema(final Schema writerSchema, int schemaId, JVar schemaVar) {
if ((Schema.Type.RECORD.equals(writerSchema.getType()) || Schema.Type.ENUM.equals(writerSchema.getType())
// TODO: Do we need `ARRAY` type here?
|| Schema.Type.FIXED.equals(writerSchema.getType()) || Schema.Type.ARRAY.equals(writerSchema.getType()))
Expand Down Expand Up @@ -1327,8 +1328,9 @@ private JMethod createMethod(final Schema writerSchema, final Schema readerSchem
}

private JExpression getSchemaExpr(Schema schema) {
Long index = Utils.getSchemaFingerprint(schema);
return (useGenericTypes && schemaVarMap.containsKey(index)) ? schemaVarMap.get(index) : JExpr._null();
int index = Utils.getSchemaFingerprint(schema);
JVar schemaVar = schemaVarMap.get(index);
return (useGenericTypes && schemaVar != null) ? schemaVar : JExpr._null();
}

private Supplier<JExpression> potentiallyCacheInvocation(Supplier<JExpression> jExpressionSupplier, JBlock body, String variableNamePrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ protected static Symbol[] reverseSymbolArray(Symbol[] symbols) {
}

public static String getClassName(Schema writerSchema, Schema readerSchema, String description) {
Long writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema));
Long readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema));
int writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema));
int readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema));
String typeName = SchemaAssistant.getTypeName(readerSchema);
return typeName + SEP + description + "Deserializer" + SEP + writerSchemaId + SEP + readerSchemaId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FastSerializerGenerator<T> extends FastSerdeBase {
/**
* Enum schema mapping for Avro-1.4 to record schema id and corresponding schema JVar.
*/
private final Map<Long, JVar> enumSchemaVarMap = new HashMap<>();
private final Map<Integer, JVar> enumSchemaVarMap = new HashMap<>();


public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
Expand All @@ -44,7 +44,7 @@ public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File dest
}

public static String getClassName(Schema schema, String description) {
Long schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
int schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
String typeName = SchemaAssistant.getTypeName(schema);
return typeName + SEP + description + "Serializer" + SEP + schemaId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroVersion;
import com.linkedin.avroutil1.compatibility.SchemaNormalization;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
Expand Down Expand Up @@ -39,7 +38,7 @@ public class Utils {
}

// Cache the mapping between Schema and the corresponding fingerprint
private static final Map<Schema, Long> SCHEMA_IDS_CACHE = new ConcurrentHashMap<>();
private static final Map<Schema, Integer> SCHEMA_IDS_CACHE = new ConcurrentHashMap<>();

private Utils() {
}
Expand Down Expand Up @@ -108,13 +107,14 @@ public static String fixSeparatorsToMatchOS(String path) {
}
/**
* This function will produce a fingerprint for the provided schema.
*
* @param schema a schema
* @return fingerprint for the given schema
*/
public static Long getSchemaFingerprint(Schema schema) {
Long schemaId = SCHEMA_IDS_CACHE.get(schema);
public static int getSchemaFingerprint(Schema schema) {
Integer schemaId = SCHEMA_IDS_CACHE.get(schema);
if (schemaId == null) {
schemaId = SchemaNormalization.parsingFingerprint64(schema);
schemaId = schema.toString().hashCode();
SCHEMA_IDS_CACHE.put(schema, schemaId);
}

Expand Down

0 comments on commit d6ab13f

Please sign in to comment.