Skip to content

Commit

Permalink
Fast Datum Reader/Writer Util with WeakIdentityHashMap (#37)
Browse files Browse the repository at this point in the history
* Fast Datum Reader/Writer Util with WeakIdentityHashMap

* Wrapper created for ConcurrentHashMap with perf improvements . FastAvroConcurrentHashMap

* Changed datum reader/writer to generic types

Co-authored-by: prkrishn <[email protected]>
  • Loading branch information
pravk03 and prkrishn authored Apr 1, 2020
1 parent cde54ff commit a20fd47
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 0 deletions.
1 change: 1 addition & 0 deletions avro-fastserde/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies {
compile "org.apache.commons:commons-lang3:3.4"
compile "com.sun.codemodel:codemodel:2.6"
compile "com.google.guava:guava:19.0"
compile "org.jboss:jboss-common-core:2.5.0.Final"

// By default, the compile and testCompile configuration is using avro-1.8, and
// if you need to switch to an old version of Avro, we need to make
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.linkedin.avro.fastserde;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;


public class FastAvroConcurrentHashMap<K, V> extends ConcurrentHashMap<K, V> {
public FastAvroConcurrentHashMap() {
super();
}

public FastAvroConcurrentHashMap(int initialCapacity) {
super(initialCapacity);
}

/**
* The native `computeIfAbsent` function implemented in Java could have contention when
* the value already exists {@link ConcurrentHashMap#computeIfAbsent(Object, Function)};
* the contention could become very bad when lots of threads are trying to "computeIfAbsent"
* on the same key, which is a known Java bug: https://bugs.openjdk.java.net/browse/JDK-8161372
*
* This internal FastAvroConcurrentHashMap mitigate such contention by trying to get the
* value first before invoking "computeIfAbsent", which brings great optimization if the major
* workload is trying to get the same key over and over again; however, we speculate that this
* optimization might not be ideal for the workload that most keys are unique, so be cautious
* about the workload before adopting the FastAvroConcurrentHashMap.
*
* @param key
* @param mappingFunction
* @return
*/
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
V value = get(key);
if (value != null) {
return value;
}
return super.computeIfAbsent(key, mappingFunction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.linkedin.avro.fastserde;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.avro.Schema;
import org.jboss.util.collection.WeakIdentityHashMap;

/**
* Utility class to generate and reuse FastDatumReader/Writer. The cache key is schema object.
*
* Pre-requisite to use this util:
* 1. Schema object will not be changed on-the-FLY, or the changes do NOT require new DatumReader/Writer;
* 2. Your application will always use the same Schema object for the same schema;
*/
public class FastDatumReaderWriterUtil {

protected static class SchemaPair {
private final Schema writerSchema;
private final Schema readerSchema;
private final int hashCode;

public SchemaPair(Schema writerSchema, Schema readerSchema) {
this.writerSchema = writerSchema;
this.readerSchema = readerSchema;
this.hashCode = Objects.hash(System.identityHashCode(this.writerSchema), System.identityHashCode(this.readerSchema));
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaPair that = (SchemaPair) o;
return writerSchema == that.writerSchema && readerSchema == that.readerSchema;
}

@Override
public int hashCode() {
return hashCode;
}
}

private static final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();

//TODO : LRU cache
private static final Map<SchemaPair, FastGenericDatumReader<?>> fastGenericDatumReaderCache = new FastAvroConcurrentHashMap<>();
private static final Map<Schema, FastGenericDatumWriter<?>> fastGenericDatumWriterCache = new WeakIdentityHashMap();

private static final Map<SchemaPair, FastSpecificDatumReader<?>> fastSpecificDatumReaderCache = new FastAvroConcurrentHashMap<>();
private static final Map<Schema, FastSpecificDatumWriter<?>> fastSpecificDatumWriterCache = new WeakIdentityHashMap();

private FastDatumReaderWriterUtil() {
}

public static <T> FastGenericDatumReader<T> getFastGenericDatumReader(Schema schema) {
return (FastGenericDatumReader<T>) getFastGenericDatumReader(schema, schema);
}

public static <T> FastGenericDatumReader<T> getFastGenericDatumReader(Schema writerSchema, Schema readerSchema) {
SchemaPair schemaPair = new SchemaPair(writerSchema, readerSchema);
return (FastGenericDatumReader<T>) fastGenericDatumReaderCache.computeIfAbsent(schemaPair, key -> new FastGenericDatumReader<>(writerSchema, readerSchema));
}

public static <T> FastGenericDatumWriter<T> getFastGenericDatumWriter(Schema writerSchema) {
FastGenericDatumWriter<T> fastDatumWriter = null;

// lookup cache and read lock
reentrantReadWriteLock.readLock().lock();
try {
fastDatumWriter = (FastGenericDatumWriter<T>)fastGenericDatumWriterCache.get(writerSchema);
} finally {
reentrantReadWriteLock.readLock().unlock();
}
// update cache and write lock
if (fastDatumWriter == null) {
reentrantReadWriteLock.writeLock().lock();
try {
fastDatumWriter = new FastGenericDatumWriter<>(writerSchema);
fastGenericDatumWriterCache.put(writerSchema, fastDatumWriter);
} finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
return (FastGenericDatumWriter <T>) fastDatumWriter;
}


public static <T> FastSpecificDatumReader<T> getFastSpecificDatumReader(Schema schema) {
return (FastSpecificDatumReader<T>) getFastSpecificDatumReader(schema, schema);
}

public static <T> FastSpecificDatumReader<T> getFastSpecificDatumReader(Schema writerSchema, Schema readerSchema) {
SchemaPair schemaPair = new SchemaPair(writerSchema, readerSchema);
return (FastSpecificDatumReader<T>) fastSpecificDatumReaderCache.computeIfAbsent(schemaPair, key -> new FastSpecificDatumReader<>(writerSchema, readerSchema));
}

public static <T> FastSpecificDatumWriter<T> getFastSpecificDatumWriter(Schema writerSchema) {
FastSpecificDatumWriter<T> fastDatumWriter = null;

// lookup cache and read lock
reentrantReadWriteLock.readLock().lock();
try {
fastDatumWriter = (FastSpecificDatumWriter<T>)fastSpecificDatumWriterCache.get(writerSchema);
} finally {
reentrantReadWriteLock.readLock().unlock();
}
// update cache and write lock
if (fastDatumWriter == null) {
reentrantReadWriteLock.writeLock().lock();
try {
fastDatumWriter = new FastSpecificDatumWriter<>(writerSchema);
fastSpecificDatumWriterCache.put(writerSchema, fastDatumWriter);
} finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
return (FastSpecificDatumWriter<T>) fastDatumWriter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.linkedin.avro.fastserde;

import org.apache.avro.Schema;
import org.testng.Assert;
import org.testng.annotations.Test;


public class FastDatumReaderWriterUtilTest {

@Test (groups = "serializationTest")
public void testIsSupportedForFastGenericDatumWriter() {
Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastGenericDatumWriter fastWriter = FastDatumReaderWriterUtil.getFastGenericDatumWriter(testSchema);
Assert.assertNotNull(fastWriter);
FastGenericDatumWriter newFastWriter = FastDatumReaderWriterUtil.getFastGenericDatumWriter(testSchema);
Assert.assertSame(fastWriter, newFastWriter);
}

@Test (groups = "deserializationTest")
public void testIsSupportedForFastGenericDatumReader() {
Schema testWriterSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
Schema testReaderSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastGenericDatumReader fastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testWriterSchema, testReaderSchema);
Assert.assertNotNull(fastReader);
FastGenericDatumReader newFastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testWriterSchema, testReaderSchema);
Assert.assertSame(fastReader, newFastReader);
}

@Test (groups = "deserializationTest")
public void testIsSupportedForFastGenericDatumReaderWithSameReaderWriterSchema() {
Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastGenericDatumReader fastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testSchema);
Assert.assertNotNull(fastReader);
FastGenericDatumReader newFastReader = FastDatumReaderWriterUtil.getFastGenericDatumReader(testSchema);
Assert.assertSame(fastReader, newFastReader);
}

@Test (groups = "serializationTest")
public void testIsSupportedForFastSpecificDatumWriter() {
Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastSpecificDatumWriter fastWriter = FastDatumReaderWriterUtil.getFastSpecificDatumWriter(testSchema);
Assert.assertNotNull(fastWriter);
FastSpecificDatumWriter newFastWriter = FastDatumReaderWriterUtil.getFastSpecificDatumWriter(testSchema);
Assert.assertSame(fastWriter, newFastWriter);
}

@Test (groups = "deserializationTest")
public void testIsSupportedForFastSpecificDatumReader() {
Schema testWriterSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
Schema testReaderSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastSpecificDatumReader fastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testWriterSchema, testReaderSchema);
Assert.assertNotNull(fastReader);
FastSpecificDatumReader newFastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testWriterSchema, testReaderSchema);
Assert.assertSame(fastReader, newFastReader);
}

@Test (groups = "deserializationTest")
public void testIsSupportedForFastSpecificDatumReaderWithSameReaderWriterSchema() {
Schema testSchema = Schema.parse("{\"type\": \"record\", \"name\": \"test_record\", \"fields\":[]}");
FastSpecificDatumReader fastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testSchema);
Assert.assertNotNull(fastReader);
FastSpecificDatumReader newFastReader = FastDatumReaderWriterUtil.getFastSpecificDatumReader(testSchema);
Assert.assertSame(fastReader, newFastReader);
}

}

0 comments on commit a20fd47

Please sign in to comment.