From d6ab13f3e42db5da31eacfd20eb24efb89265121 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Thu, 17 Aug 2023 08:38:00 -0700 Subject: [PATCH 1/2] Tweaked the fast-avro schema fingerprinting logic It used to leverage the Avro fingerprinting, which is based on the "parsing canonical form" of the schema, but this ignores properties such as which class to use for String deserialization. This is a problem in cases where we wish to deserialize the same schema both with and without Java Strings, since the cache will return the same deserializer for both, ultimately leading to class cast issues. The new approach is to simply take the hash code of the full string of the schema. --- .../avro/fastserde/FastStringableTest.java | 54 ++++++++++++------- .../fastserde/FastDeserializerGenerator.java | 20 +++---- .../FastDeserializerGeneratorBase.java | 4 +- .../fastserde/FastSerializerGenerator.java | 4 +- .../com/linkedin/avro/fastserde/Utils.java | 10 ++-- 5 files changed, 54 insertions(+), 38 deletions(-) diff --git a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java index 51d8c3197..53afab35f 100644 --- a/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java +++ b/fastserde/avro-fastserde-tests-common/src/test/java/com/linkedin/avro/fastserde/FastStringableTest.java @@ -224,34 +224,48 @@ public void javaStringPropertyInReaderSchemaTest(Boolean whetherUseFastDeseriali stringMap.put("ddd", "eee"); record.put("testStringMap", stringMap); - Decoder decoder = writeWithFastAvro(record, writerSchema, false); + Decoder decoder1 = writeWithFastAvro(record, writerSchema, false); + Decoder decoder2 = writeWithFastAvro(record, writerSchema, false); - GenericRecord afterDecoding; + GenericRecord afterDecodingWithJavaString, afterDecodingWithoutJavaString; if (whetherUseFastDeserializer) { - afterDecoding = readWithFastAvro(writerSchema, readerSchema, decoder, false); + afterDecodingWithJavaString = readWithFastAvro(writerSchema, readerSchema, decoder1, false); + afterDecodingWithoutJavaString = readWithFastAvro(writerSchema, writerSchema, decoder2, false); } else { - afterDecoding = readWithSlowAvro(writerSchema, readerSchema, decoder, false); + afterDecodingWithJavaString = readWithSlowAvro(writerSchema, readerSchema, decoder1, false); + afterDecodingWithoutJavaString = readWithSlowAvro(writerSchema, writerSchema, decoder2, false); } if (Utils.isAbleToSupportJavaStrings()){ - Assert.assertTrue(afterDecoding.get(0) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass()); - Assert.assertTrue(afterDecoding.get(1) instanceof String, "String is expected, but got: " + afterDecoding.get(0).getClass()); - Assert.assertTrue(((GenericData.Array) afterDecoding.get(2)).get(0) instanceof String, - "String is expected, but got: " + ((GenericData.Array) afterDecoding.get(2)).get(0).getClass()); - Assert.assertTrue(((Map) afterDecoding.get(3)).keySet().iterator().next() instanceof String, - "String is expected, but got: " + ((Map) afterDecoding.get(3)).keySet().iterator().next().getClass()); - Assert.assertTrue(((Map) afterDecoding.get(3)).values().iterator().next() instanceof String, - "String is expected, but got: " + ((Map) afterDecoding.get(3)).values().iterator().next().getClass()); + Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass()); + Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof String, "String is expected, but got: " + afterDecodingWithJavaString.get(0).getClass()); + Assert.assertTrue(((GenericData.Array) afterDecodingWithJavaString.get(2)).get(0) instanceof String, + "String is expected, but got: " + ((GenericData.Array) afterDecodingWithJavaString.get(2)).get(0).getClass()); + Assert.assertTrue(((Map) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof String, + "String is expected, but got: " + ((Map) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass()); + Assert.assertTrue(((Map) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof String, + "String is expected, but got: " + ((Map) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass()); } else { - Assert.assertTrue(afterDecoding.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass()); - Assert.assertTrue(afterDecoding.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecoding.get(0).getClass()); - Assert.assertTrue(((GenericData.Array) afterDecoding.get(2)).get(0) instanceof Utf8, - "Utf8 is expected, but got: " + ((GenericData.Array) afterDecoding.get(2)).get(0).getClass()); - Assert.assertTrue(((Map) afterDecoding.get(3)).keySet().iterator().next() instanceof Utf8, - "Utf8 is expected, but got: " + ((Map) afterDecoding.get(3)).keySet().iterator().next().getClass()); - Assert.assertTrue(((Map) afterDecoding.get(3)).values().iterator().next() instanceof Utf8, - "Utf8 is expected, but got: " + ((Map) afterDecoding.get(3)).values().iterator().next().getClass()); + Assert.assertTrue(afterDecodingWithJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass()); + Assert.assertTrue(afterDecodingWithJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithJavaString.get(0).getClass()); + Assert.assertTrue(((GenericData.Array) afterDecodingWithJavaString.get(2)).get(0) instanceof Utf8, + "Utf8 is expected, but got: " + ((GenericData.Array) afterDecodingWithJavaString.get(2)).get(0).getClass()); + Assert.assertTrue(((Map) afterDecodingWithJavaString.get(3)).keySet().iterator().next() instanceof Utf8, + "Utf8 is expected, but got: " + ((Map) afterDecodingWithJavaString.get(3)).keySet().iterator().next().getClass()); + Assert.assertTrue(((Map) afterDecodingWithJavaString.get(3)).values().iterator().next() instanceof Utf8, + "Utf8 is expected, but got: " + ((Map) afterDecodingWithJavaString.get(3)).values().iterator().next().getClass()); } + + // Regardless of the above, we should also be able to decode to Utf8 within the same JVM that already decoded Java Strings + Assert.assertTrue(afterDecodingWithoutJavaString.get(0) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass()); + Assert.assertTrue(afterDecodingWithoutJavaString.get(1) instanceof Utf8, "Utf8 is expected, but got: " + afterDecodingWithoutJavaString.get(0).getClass()); + Assert.assertTrue(((GenericData.Array) afterDecodingWithoutJavaString.get(2)).get(0) instanceof Utf8, + "Utf8 is expected, but got: " + ((GenericData.Array) afterDecodingWithoutJavaString.get(2)).get(0).getClass()); + Assert.assertTrue(((Map) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next() instanceof Utf8, + "Utf8 is expected, but got: " + ((Map) afterDecodingWithoutJavaString.get(3)).keySet().iterator().next().getClass()); + Assert.assertTrue(((Map) afterDecodingWithoutJavaString.get(3)).values().iterator().next() instanceof Utf8, + "Utf8 is expected, but got: " + ((Map) afterDecodingWithoutJavaString.get(3)).values().iterator().next().getClass()); + } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java index a8444d577..7463dd869 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGenerator.java @@ -70,8 +70,8 @@ public class FastDeserializerGenerator extends FastDeserializerGeneratorBase< private static final Supplier EMPTY_SUPPLIER = () -> JExpr._null(); private JMethod constructor; - private Map schemaMap = new HashMap<>(); - private Map schemaVarMap = new HashMap<>(); + private Map schemaMap = new HashMap<>(); + private Map schemaVarMap = new HashMap<>(); private Map deserializeMethodMap = new HashMap<>(); private Map skipMethodMap = new HashMap<>(); private Map>> exceptionFromMethodMap = new HashMap<>(); @@ -1246,11 +1246,12 @@ private JVar declareSchemaVar(Schema valueSchema, String variableName, JInvocati */ if (SchemaAssistant.isComplexType(valueSchema) || Schema.Type.ENUM.equals(valueSchema.getType()) || Schema.Type.FIXED.equals(valueSchema.getType())) { - long schemaId = Utils.getSchemaFingerprint(valueSchema); - if (schemaVarMap.get(schemaId) != null) { - return schemaVarMap.get(schemaId); + int schemaId = Utils.getSchemaFingerprint(valueSchema); + JVar schemaVar = schemaVarMap.get(schemaId); + if (schemaVar != null) { + return schemaVar; } else { - JVar schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class, + schemaVar = generatedClass.field(JMod.PRIVATE | JMod.FINAL, Schema.class, getUniqueName(StringUtils.uncapitalize(variableName))); constructor.body().assign(JExpr.refthis(schemaVar.name()), getValueType); @@ -1266,7 +1267,7 @@ private void registerSchema(final Schema writerSchema, JVar schemaVar) { registerSchema(writerSchema, Utils.getSchemaFingerprint(writerSchema), schemaVar); } - private void registerSchema(final Schema writerSchema, long schemaId, JVar schemaVar) { + private void registerSchema(final Schema writerSchema, int schemaId, JVar schemaVar) { if ((Schema.Type.RECORD.equals(writerSchema.getType()) || Schema.Type.ENUM.equals(writerSchema.getType()) // TODO: Do we need `ARRAY` type here? || Schema.Type.FIXED.equals(writerSchema.getType()) || Schema.Type.ARRAY.equals(writerSchema.getType())) @@ -1327,8 +1328,9 @@ private JMethod createMethod(final Schema writerSchema, final Schema readerSchem } private JExpression getSchemaExpr(Schema schema) { - Long index = Utils.getSchemaFingerprint(schema); - return (useGenericTypes && schemaVarMap.containsKey(index)) ? schemaVarMap.get(index) : JExpr._null(); + int index = Utils.getSchemaFingerprint(schema); + JVar schemaVar = schemaVarMap.get(index); + return (useGenericTypes && schemaVar != null) ? schemaVar : JExpr._null(); } private Supplier potentiallyCacheInvocation(Supplier jExpressionSupplier, JBlock body, String variableNamePrefix) { diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorBase.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorBase.java index 017c0b277..c4166c700 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorBase.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDeserializerGeneratorBase.java @@ -37,8 +37,8 @@ protected static Symbol[] reverseSymbolArray(Symbol[] symbols) { } public static String getClassName(Schema writerSchema, Schema readerSchema, String description) { - Long writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema)); - Long readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema)); + int writerSchemaId = Math.abs(Utils.getSchemaFingerprint(writerSchema)); + int readerSchemaId = Math.abs(Utils.getSchemaFingerprint(readerSchema)); String typeName = SchemaAssistant.getTypeName(readerSchema); return typeName + SEP + description + "Deserializer" + SEP + writerSchemaId + SEP + readerSchemaId; } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java index 7d79c0acd..fe4c97534 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastSerializerGenerator.java @@ -34,7 +34,7 @@ public class FastSerializerGenerator extends FastSerdeBase { /** * Enum schema mapping for Avro-1.4 to record schema id and corresponding schema JVar. */ - private final Map enumSchemaVarMap = new HashMap<>(); + private final Map enumSchemaVarMap = new HashMap<>(); public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File destination, ClassLoader classLoader, @@ -44,7 +44,7 @@ public FastSerializerGenerator(boolean useGenericTypes, Schema schema, File dest } public static String getClassName(Schema schema, String description) { - Long schemaId = Math.abs(Utils.getSchemaFingerprint(schema)); + int schemaId = Math.abs(Utils.getSchemaFingerprint(schema)); String typeName = SchemaAssistant.getTypeName(schema); return typeName + SEP + description + "Serializer" + SEP + schemaId; } diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java index 674344824..9fc5f84fa 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java @@ -2,7 +2,6 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; -import com.linkedin.avroutil1.compatibility.SchemaNormalization; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -39,7 +38,7 @@ public class Utils { } // Cache the mapping between Schema and the corresponding fingerprint - private static final Map SCHEMA_IDS_CACHE = new ConcurrentHashMap<>(); + private static final Map SCHEMA_IDS_CACHE = new ConcurrentHashMap<>(); private Utils() { } @@ -108,13 +107,14 @@ public static String fixSeparatorsToMatchOS(String path) { } /** * This function will produce a fingerprint for the provided schema. + * * @param schema a schema * @return fingerprint for the given schema */ - public static Long getSchemaFingerprint(Schema schema) { - Long schemaId = SCHEMA_IDS_CACHE.get(schema); + public static int getSchemaFingerprint(Schema schema) { + Integer schemaId = SCHEMA_IDS_CACHE.get(schema); if (schemaId == null) { - schemaId = SchemaNormalization.parsingFingerprint64(schema); + schemaId = schema.toString().hashCode(); SCHEMA_IDS_CACHE.put(schema, schemaId); } From cf8bea2e2ddae664dd4e5f6ee70ea75c7d293b78 Mon Sep 17 00:00:00 2001 From: Felix GV Date: Thu, 17 Aug 2023 11:41:52 -0700 Subject: [PATCH 2/2] Made the id generation stable across runtimes. --- .../java/com/linkedin/avro/fastserde/Utils.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java index 9fc5f84fa..36666be91 100644 --- a/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java +++ b/fastserde/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/Utils.java @@ -2,11 +2,14 @@ import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper; import com.linkedin.avroutil1.compatibility.AvroVersion; +import com.linkedin.avroutil1.compatibility.AvscGenerationConfig; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.security.CodeSource; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; @@ -114,7 +117,19 @@ public static String fixSeparatorsToMatchOS(String path) { public static int getSchemaFingerprint(Schema schema) { Integer schemaId = SCHEMA_IDS_CACHE.get(schema); if (schemaId == null) { - schemaId = schema.toString().hashCode(); + String schemaString = AvroCompatibilityHelper.toAvsc(schema, AvscGenerationConfig.CORRECT_ONELINE); + try { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] digest = md.digest(schemaString.getBytes()); + int scratchPad = 0; + for (int i = 0; i < digest.length; i++) { + scratchPad = (scratchPad * 256 + (digest[i] & 0xFF)); + } + schemaId = scratchPad; + } catch (NoSuchAlgorithmException e) { + schemaId = schemaString.hashCode(); + } + SCHEMA_IDS_CACHE.put(schema, schemaId); }