Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweaked the fast-avro schema fingerprinting logic #508

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,34 +224,48 @@ public void javaStringPropertyInReaderSchemaTest(Boolean whetherUseFastDeseriali
stringMap.put("ddd", "eee");
record.put("testStringMap", stringMap);

Decoder decoder = writeWithFastAvro(record, writerSchema, false);
Decoder decoder1 = writeWithFastAvro(record, writerSchema, false);
Decoder decoder2 = writeWithFastAvro(record, writerSchema, false);

GenericRecord afterDecoding;
GenericRecord afterDecodingWithJavaString, afterDecodingWithoutJavaString;
if (whetherUseFastDeserializer) {
afterDecoding = readWithFastAvro(writerSchema, readerSchema, decoder, false);
afterDecodingWithJavaString = readWithFastAvro(writerSchema, readerSchema, decoder1, false);
afterDecodingWithoutJavaString = readWithFastAvro(writerSchema, writerSchema, decoder2, false);
} else {
afterDecoding = readWithSlowAvro(writerSchema, readerSchema, decoder, false);
afterDecodingWithJavaString = readWithSlowAvro(writerSchema, readerSchema, decoder1, false);
afterDecodingWithoutJavaString = readWithSlowAvro(writerSchema, writerSchema, decoder2, false);
}

if (Utils.isAbleToSupportJavaStrings()){
Assert.assertTrue(afterDecoding.get(0) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(afterDecoding.get(1) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecoding.get(2)).get(0) instanceof String,
"String is expected, but got: " + ((GenericData.Array<Object>) afterDecoding.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next().getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0) instanceof String,
"String is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof String,
"String is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass());
} else {
Assert.assertTrue(afterDecoding.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(afterDecoding.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecoding.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecoding.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecoding.get(3)).values().iterator().next().getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass());
}

// Regardless of the above, we should also be able to decode to Utf8 within the same JVM that already decoded Java Strings
Assert.assertTrue(afterDecodingWithoutJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass());
Assert.assertTrue(afterDecodingWithoutJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass());
Assert.assertTrue(((GenericData.Array<Object>) afterDecodingWithoutJavaString.get(2)).get(0) instanceof Utf8,
"Utf8 is expected, but got: " + ((GenericData.Array<Object>) afterDecodingWithoutJavaString.get(2)).get(0).getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next().getClass());
Assert.assertTrue(((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).values().iterator().next() instanceof Utf8,
"Utf8 is expected, but got: " + ((Map<Object, Object>) afterDecodingWithoutJavaString.get(3)).values().iterator().next().getClass());

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public class FastDeserializerGenerator<T> extends FastDeserializerGeneratorBase<
private static final Supplier<JExpression> EMPTY_SUPPLIER = () -> JExpr._null();

private JMethod constructor;
private Map<Long, Schema> schemaMap = new HashMap<>();
private Map<Long, JVar> schemaVarMap = new HashMap<>();
private Map<Integer, Schema> schemaMap = new HashMap<>();
private Map<Integer, JVar> schemaVarMap = new HashMap<>();
private Map<String, JMethod> deserializeMethodMap = new HashMap<>();
private Map<String, JMethod> skipMethodMap = new HashMap<>();
private Map<JMethod, Set<Class<? extends Exception>>> exceptionFromMethodMap = new HashMap<>();
Expand Down Expand Up @@ -1246,11 +1246,12 @@ private JVar declareSchemaVar(Schema valueSchema, String variableName, JInvocati
*/
if (SchemaAssistant.isComplexType(valueSchema) || Schema.Type.ENUM.equals(valueSchema.getType())
|| Schema.Type.FIXED.equals(valueSchema.getType())) {
long schemaId = Utils.getSchemaFingerprint(valueSchema);
if (schemaVarMap.get(schemaId) != null) {
return schemaVarMap.get(schemaId);
int schemaId = Utils.getSchemaFingerprint(valueSchema);
JVar schemaVar = schemaVarMap.get(schemaId);
if (schemaVar != null) {
return schemaVar;
} else {
JVar schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class,
schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class,
getUniqueName(StringUtils.uncapitalize(variableName)));
constructor.body().assign(JExpr.refthis(schemaVar.name()), getValueType);

Expand All @@ -1266,7 +1267,7 @@ private void registerSchema(final Schema writerSchema, JVar schemaVar) {
registerSchema(writerSchema, Utils.getSchemaFingerprint(writerSchema), schemaVar);
}

private void registerSchema(final Schema writerSchema, long schemaId, JVar schemaVar) {
private void registerSchema(final Schema writerSchema, int schemaId, JVar schemaVar) {
if ((Schema.Type.RECORD.equals(writerSchema.getType()) || Schema.Type.ENUM.equals(writerSchema.getType())
// TODO: Do we need `ARRAY` type here?
|| Schema.Type.FIXED.equals(writerSchema.getType()) || Schema.Type.ARRAY.equals(writerSchema.getType()))
Expand Down Expand Up @@ -1327,8 +1328,9 @@ private JMethod createMethod(final Schema writerSchema, final Schema readerSchem
}

private JExpression getSchemaExpr(Schema schema) {
Long index = Utils.getSchemaFingerprint(schema);
return (useGenericTypes && schemaVarMap.containsKey(index)) ? schemaVarMap.get(index) : JExpr._null();
int index = Utils.getSchemaFingerprint(schema);
JVar schemaVar = schemaVarMap.get(index);
return (useGenericTypes && schemaVar != null) ? schemaVar : JExpr._null();
}

private Supplier<JExpression> potentiallyCacheInvocation(Supplier<JExpression> jExpressionSupplier, JBlock body, String variableNamePrefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ protected static Symbol[] reverseSymbolArray(Symbol[] symbols) {
}

public static String getClassName(Schema writerSchema, Schema readerSchema, String description) {
Long writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema));
Long readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema));
int writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema));
int readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema));
String typeName = SchemaAssistant.getTypeName(readerSchema);
return typeName + SEP + description + "Deserializer" + SEP + writerSchemaId + SEP + readerSchemaId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class FastSerializerGenerator<T> extends FastSerdeBase {
/**
* Enum schema mapping for Avro-1.4 to record schema id and corresponding schema JVar.
*/
private final Map<Long, JVar> enumSchemaVarMap = new HashMap<>();
private final Map<Integer, JVar> enumSchemaVarMap = new HashMap<>();


public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader,
Expand All @@ -44,7 +44,7 @@ public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File dest
}

public static String getClassName(Schema schema, String description) {
Long schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
int schemaId = Math.abs(Utils.getSchemaFingerprint(schema));
String typeName = SchemaAssistant.getTypeName(schema);
return typeName + SEP + description + "Serializer" + SEP + schemaId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
import com.linkedin.avroutil1.compatibility.AvroVersion;
import com.linkedin.avroutil1.compatibility.SchemaNormalization;
import com.linkedin.avroutil1.compatibility.AvscGenerationConfig;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.CodeSource;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -39,7 +41,7 @@ public class Utils {
}

// Cache the mapping between Schema and the corresponding fingerprint
private static final Map<Schema, Long> SCHEMA_IDS_CACHE = new ConcurrentHashMap<>();
private static final Map<Schema, Integer> SCHEMA_IDS_CACHE = new ConcurrentHashMap<>();

private Utils() {
}
Expand Down Expand Up @@ -108,13 +110,26 @@ public static String fixSeparatorsToMatchOS(String path) {
}
/**
* This function will produce a fingerprint for the provided schema.
*
* @param schema a schema
* @return fingerprint for the given schema
*/
public static Long getSchemaFingerprint(Schema schema) {
Long schemaId = SCHEMA_IDS_CACHE.get(schema);
public static int getSchemaFingerprint(Schema schema) {
Integer schemaId = SCHEMA_IDS_CACHE.get(schema);
if (schemaId == null) {
schemaId = SchemaNormalization.parsingFingerprint64(schema);
String schemaString = AvroCompatibilityHelper.toAvsc(schema, AvscGenerationConfig.CORRECT_ONELINE);
try {
MessageDigest md = MessageDigest.getInstance("MD5");
byte[] digest = md.digest(schemaString.getBytes());
int scratchPad = 0;
for (int i = 0; i < digest.length; i++) {
scratchPad = (scratchPad * 256 + (digest[i] & 0xFF));
}
schemaId = scratchPad;
} catch (NoSuchAlgorithmException e) {
schemaId = schemaString.hashCode();
}

SCHEMA_IDS_CACHE.put(schema, schemaId);
}

Expand Down
Loading