Skip to content

Commit

Permalink
Implement WeakIdentityHashMap in avro-fastserde
Browse files Browse the repository at this point in the history
  • Loading branch information
Bingfeng Xia authored and FelixGV committed Apr 4, 2020
1 parent 9e8ad68 commit 1eb7931
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 5 deletions.
1 change: 0 additions & 1 deletion avro-fastserde/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ dependencies {
compile "org.apache.commons:commons-lang3:3.4"
compile "com.sun.codemodel:codemodel:2.6"
compile "com.google.guava:guava:19.0"
compile "org.jboss:jboss-common-core:2.5.0.Final"

// By default, the compile and testCompile configuration is using avro-1.8, and
// if you need to switch to an old version of Avro, we need to make
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.avro.Schema;
import org.jboss.util.collection.WeakIdentityHashMap;

/**
* Utility class to generate and reuse FastDatumReader/Writer. The cache key is schema object.
Expand Down Expand Up @@ -49,10 +47,10 @@ public int hashCode() {

//TODO : LRU cache
private static final Map<SchemaPair, FastGenericDatumReader<?>> fastGenericDatumReaderCache = new FastAvroConcurrentHashMap<>();
private static final Map<Schema, FastGenericDatumWriter<?>> fastGenericDatumWriterCache = new WeakIdentityHashMap();
private static final Map<Schema, FastGenericDatumWriter<?>> fastGenericDatumWriterCache = new WeakIdentityHashMap<>();

private static final Map<SchemaPair, FastSpecificDatumReader<?>> fastSpecificDatumReaderCache = new FastAvroConcurrentHashMap<>();
private static final Map<Schema, FastSpecificDatumWriter<?>> fastSpecificDatumWriterCache = new WeakIdentityHashMap();
private static final Map<Schema, FastSpecificDatumWriter<?>> fastSpecificDatumWriterCache = new WeakIdentityHashMap<>();

private FastDatumReaderWriterUtil() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.linkedin.avro.fastserde;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* Implements a combination of WeakHashMap and IdentityHashMap. Useful for
* caches that need to key off of a == comparison instead of a .equals.
*
* <b> This class is not a general-purpose Map implementation! While this class
* implements the Map interface, it intentionally violates Map's general
* contract, which mandates the use of the equals method when comparing objects.
* This class is designed for use only in the rare cases wherein
* reference-equality semantics are required.
*
* Note that this implementation is not synchronized. </b>
*/
public class WeakIdentityHashMap<K, V> implements Map<K, V> {
private final ReferenceQueue<K> queue = new ReferenceQueue<>();
private Map<IdentityWeakReference, V> backingStore = new HashMap<>();

public WeakIdentityHashMap() {
}

@Override
public void clear() {
backingStore.clear();
reap();
}

@Override
public boolean containsKey(Object key) {
reap();
return backingStore.containsKey(new IdentityWeakReference(key));
}

@Override
public boolean containsValue(Object value) {
reap();
return backingStore.containsValue(value);
}

@Override
public Set<Map.Entry<K, V>> entrySet() {
reap();
Set<Map.Entry<K, V>> ret = new HashSet<>();
for (Map.Entry<IdentityWeakReference, V> ref : backingStore.entrySet()) {
final K key = ref.getKey().get();
final V value = ref.getValue();
Map.Entry<K, V> entry = new Map.Entry<K, V>() {
@Override
public K getKey() {
return key;
}

@Override
public V getValue() {
return value;
}

@Override
public V setValue(V value) {
throw new UnsupportedOperationException();
}
};
ret.add(entry);
}
return Collections.unmodifiableSet(ret);
}

@Override
public Set<K> keySet() {
reap();
Set<K> ret = new HashSet<>();
for (IdentityWeakReference ref : backingStore.keySet()) {
ret.add(ref.get());
}
return Collections.unmodifiableSet(ret);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof WeakIdentityHashMap)) {
return false;
}
return backingStore.equals(((WeakIdentityHashMap) o).backingStore);
}

@Override
public V get(Object key) {
reap();
return backingStore.get(new IdentityWeakReference(key));
}

@Override
public V put(K key, V value) {
reap();
return backingStore.put(new IdentityWeakReference(key), value);
}

@Override
public int hashCode() {
reap();
return backingStore.hashCode();
}

@Override
public boolean isEmpty() {
reap();
return backingStore.isEmpty();
}

@Override
public void putAll(Map t) {
throw new UnsupportedOperationException();
}

@Override
public V remove(Object key) {
reap();
return backingStore.remove(new IdentityWeakReference(key));
}

@Override
public int size() {
reap();
return backingStore.size();
}

@Override
public Collection<V> values() {
reap();
return backingStore.values();
}

private synchronized void reap() {
Object zombie = queue.poll();

while (zombie != null) {
IdentityWeakReference victim = (IdentityWeakReference) zombie;
backingStore.remove(victim);
zombie = queue.poll();
}
}

class IdentityWeakReference extends WeakReference<K> {
int hash;

@SuppressWarnings("unchecked")
IdentityWeakReference(Object obj) {
super((K) obj, queue);
hash = System.identityHashCode(obj);
}

@Override
public int hashCode() {
return hash;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof WeakIdentityHashMap.IdentityWeakReference)) {
return false;
}
IdentityWeakReference ref = (IdentityWeakReference) o;
return this.get() == ref.get();
}
}
}

0 comments on commit 1eb7931

Please sign in to comment.