diff --git a/avro-fastserde/build.gradle b/avro-fastserde/build.gradle index b02a026cf..88bc49e0c 100644 --- a/avro-fastserde/build.gradle +++ b/avro-fastserde/build.gradle @@ -43,6 +43,7 @@ dependencies { compile "org.apache.commons:commons-lang3:3.4" compile "com.sun.codemodel:codemodel:2.6" compile "com.google.guava:guava:19.0" + compile "org.jboss:jboss-common-core:2.5.0.Final" // By default, the compile and testCompile configuration is using avro-1.8, and // if you need to switch to an old version of Avro, we need to make diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastAvroConcurrentHashMap.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastAvroConcurrentHashMap.java new file mode 100644 index 000000000..24282b404 --- /dev/null +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastAvroConcurrentHashMap.java @@ -0,0 +1,40 @@ +package com.linkedin.avro.fastserde; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; + + +public class FastAvroConcurrentHashMap extends ConcurrentHashMap { + public FastAvroConcurrentHashMap() { + super(); + } + + public FastAvroConcurrentHashMap(int initialCapacity) { + super(initialCapacity); + } + + /** + * The native `computeIfAbsent` function implemented in Java could have contention when + * the value already exists {@link ConcurrentHashMap#computeIfAbsent(Object, Function)}; + * the contention could become very bad when lots of threads are trying to "computeIfAbsent" + * on the same key, which is a known Java bug: https://bugs.openjdk.java.net/browse/JDK-8161372 + * + * This internal FastAvroConcurrentHashMap mitigate such contention by trying to get the + * value first before invoking "computeIfAbsent", which brings great optimization if the major + * workload is trying to get the same key over and over again; however, we speculate that this + * optimization might not be ideal for the workload that most keys are unique, so be cautious + * about the workload before adopting the FastAvroConcurrentHashMap. + * + * @param key + * @param mappingFunction + * @return + */ + @Override + public V computeIfAbsent(K key, Function mappingFunction) { + V value = get(key); + if (value != null) { + return value; + } + return super.computeIfAbsent(key, mappingFunction); + } +} diff --git a/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtil.java b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtil.java new file mode 100644 index 000000000..4c3f844e2 --- /dev/null +++ b/avro-fastserde/src/main/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtil.java @@ -0,0 +1,124 @@ +package com.linkedin.avro.fastserde; + +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.avro.Schema; +import org.jboss.util.collection.WeakIdentityHashMap; + +/** + * Utility class to generate and reuse FastDatumReader/Writer. The cache key is schema object. + * + * Pre-requisite to use this util: + * 1. Schema object will not be changed on-the-FLY, or the changes do NOT require new DatumReader/Writer; + * 2. Your application will always use the same Schema object for the same schema; + */ +public class FastDatumReaderWriterUtil { + + protected static class SchemaPair { + private final Schema writerSchema; + private final Schema readerSchema; + private final int hashCode; + + public SchemaPair(Schema writerSchema, Schema readerSchema) { + this.writerSchema = writerSchema; + this.readerSchema = readerSchema; + this.hashCode = Objects.hash(System.identityHashCode(this.writerSchema), System.identityHashCode(this.readerSchema)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaPair that = (SchemaPair) o; + return writerSchema == that.writerSchema && readerSchema == that.readerSchema; + } + + @Override + public int hashCode() { + return hashCode; + } + } + + private static final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); + + //TODO : LRU cache + private static final Map> fastGenericDatumReaderCache = new FastAvroConcurrentHashMap<>(); + private static final Map> fastGenericDatumWriterCache = new WeakIdentityHashMap(); + + private static final Map> fastSpecificDatumReaderCache = new FastAvroConcurrentHashMap<>(); + private static final Map> fastSpecificDatumWriterCache = new WeakIdentityHashMap(); + + private FastDatumReaderWriterUtil() { + } + + public static FastGenericDatumReader getFastGenericDatumReader(Schema schema) { + return (FastGenericDatumReader) getFastGenericDatumReader(schema, schema); + } + + public static FastGenericDatumReader getFastGenericDatumReader(Schema writerSchema, Schema readerSchema) { + SchemaPair schemaPair = new SchemaPair(writerSchema, readerSchema); + return (FastGenericDatumReader) fastGenericDatumReaderCache.computeIfAbsent(schemaPair, key -> new FastGenericDatumReader<>(writerSchema, readerSchema)); + } + + public static FastGenericDatumWriter getFastGenericDatumWriter(Schema writerSchema) { + FastGenericDatumWriter fastDatumWriter = null; + + // lookup cache and read lock + reentrantReadWriteLock.readLock().lock(); + try { + fastDatumWriter = (FastGenericDatumWriter)fastGenericDatumWriterCache.get(writerSchema); + } finally { + reentrantReadWriteLock.readLock().unlock(); + } + // update cache and write lock + if (fastDatumWriter == null) { + reentrantReadWriteLock.writeLock().lock(); + try { + fastDatumWriter = new FastGenericDatumWriter<>(writerSchema); + fastGenericDatumWriterCache.put(writerSchema, fastDatumWriter); + } finally { + reentrantReadWriteLock.writeLock().unlock(); + } + } + return (FastGenericDatumWriter ) fastDatumWriter; + } + + + public static FastSpecificDatumReader getFastSpecificDatumReader(Schema schema) { + return (FastSpecificDatumReader) getFastSpecificDatumReader(schema, schema); + } + + public static FastSpecificDatumReader getFastSpecificDatumReader(Schema writerSchema, Schema readerSchema) { + SchemaPair schemaPair = new SchemaPair(writerSchema, readerSchema); + return (FastSpecificDatumReader) fastSpecificDatumReaderCache.computeIfAbsent(schemaPair, key -> new FastSpecificDatumReader<>(writerSchema, readerSchema)); + } + + public static FastSpecificDatumWriter getFastSpecificDatumWriter(Schema writerSchema) { + FastSpecificDatumWriter fastDatumWriter = null; + + // lookup cache and read lock + reentrantReadWriteLock.readLock().lock(); + try { + fastDatumWriter = (FastSpecificDatumWriter)fastSpecificDatumWriterCache.get(writerSchema); + } finally { + reentrantReadWriteLock.readLock().unlock(); + } + // update cache and write lock + if (fastDatumWriter == null) { + reentrantReadWriteLock.writeLock().lock(); + try { + fastDatumWriter = new FastSpecificDatumWriter<>(writerSchema); + fastSpecificDatumWriterCache.put(writerSchema, fastDatumWriter); + } finally { + reentrantReadWriteLock.writeLock().unlock(); + } + } + return (FastSpecificDatumWriter) fastDatumWriter; + } +} \ No newline at end of file diff --git a/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtilTest.java b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtilTest.java new file mode 100644 index 000000000..5e2876d86 --- /dev/null +++ b/avro-fastserde/src/test/java/com/linkedin/avro/fastserde/FastDatumReaderWriterUtilTest.java @@ -0,0 +1,66 @@ +package com.linkedin.avro.fastserde; + +import org.apache.avro.Schema; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class FastDatumReaderWriterUtilTest { + + @Test (groups = "serializationTest") + public void testIsSupportedForFastGenericDatumWriter() { + Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastGenericDatumWriter fastWriter = FastDatumReaderWriterUtil.getFastGenericDatumWriter(testSchema); + Assert.assertNotNull(fastWriter); + FastGenericDatumWriter newFastWriter = FastDatumReaderWriterUtil.getFastGenericDatumWriter(testSchema); + Assert.assertSame(fastWriter, newFastWriter); + } + + @Test (groups = "deserializationTest") + public void testIsSupportedForFastGenericDatumReader() { + Schema testWriterSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + Schema testReaderSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastGenericDatumReader fastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testWriterSchema, testReaderSchema); + Assert.assertNotNull(fastReader); + FastGenericDatumReader newFastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testWriterSchema, testReaderSchema); + Assert.assertSame(fastReader, newFastReader); + } + + @Test (groups = "deserializationTest") + public void testIsSupportedForFastGenericDatumReaderWithSameReaderWriterSchema() { + Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastGenericDatumReader fastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testSchema); + Assert.assertNotNull(fastReader); + FastGenericDatumReader newFastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testSchema); + Assert.assertSame(fastReader, newFastReader); + } + + @Test (groups = "serializationTest") + public void testIsSupportedForFastSpecificDatumWriter() { + Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastSpecificDatumWriter fastWriter = FastDatumReaderWriterUtil.getFastSpecificDatumWriter(testSchema); + Assert.assertNotNull(fastWriter); + FastSpecificDatumWriter newFastWriter = FastDatumReaderWriterUtil.getFastSpecificDatumWriter(testSchema); + Assert.assertSame(fastWriter, newFastWriter); + } + + @Test (groups = "deserializationTest") + public void testIsSupportedForFastSpecificDatumReader() { + Schema testWriterSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + Schema testReaderSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastSpecificDatumReader fastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testWriterSchema, testReaderSchema); + Assert.assertNotNull(fastReader); + FastSpecificDatumReader newFastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testWriterSchema, testReaderSchema); + Assert.assertSame(fastReader, newFastReader); + } + + @Test (groups = "deserializationTest") + public void testIsSupportedForFastSpecificDatumReaderWithSameReaderWriterSchema() { + Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}"); + FastSpecificDatumReader fastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testSchema); + Assert.assertNotNull(fastReader); + FastSpecificDatumReader newFastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testSchema); + Assert.assertSame(fastReader, newFastReader); + } + +}