From fef25c3b7ca438d4a432947921daa9876601fe14 Mon Sep 17 00:00:00 2001 From: Robert Newson Date: Tue, 3 Dec 2024 12:40:27 +0000 Subject: [PATCH] toggle for NonBlockingHash{Map,Set} --- src/main/scala/scalang/Node.scala | 56 ++++++++++++++----- src/main/scala/scalang/NodeConfig.scala | 3 +- .../util/concurrent/ConcurrentHashSet.scala | 36 ++++++++++++ 3 files changed, 81 insertions(+), 14 deletions(-) create mode 100644 src/main/scala/scalang/util/concurrent/ConcurrentHashSet.scala diff --git a/src/main/scala/scalang/Node.scala b/src/main/scala/scalang/Node.scala index c2d7400..146c585 100644 --- a/src/main/scala/scalang/Node.scala +++ b/src/main/scala/scalang/Node.scala @@ -19,7 +19,9 @@ import java.util.concurrent.atomic._ import org.jboss.{netty => netty} import netty.channel._ import java.util.concurrent._ +import java.util.{Set => JSet} import scalang.node._ +import scalang.util.concurrent.ConcurrentHashSet import org.jctools.maps._ import overlock.atomicmap._ import org.jetlang._ @@ -184,6 +186,34 @@ trait Node extends ClusterListener with ClusterPublisher { def timer : HashedWheelTimer } +object ErlangNode { + + def newConcurrentMap[K,V](config : NodeConfig): ConcurrentMap[K,V] = { + if (config.useNBHM) { + new NonBlockingHashMap[K,V] + } else { + new ConcurrentHashMap[K,V] + } + } + + def newAtomicMap[K,V](config : NodeConfig): AtomicMap[K,V] = { + if (config.useNBHM) { + AtomicMap.atomicNBHM[K,V] + } else { + AtomicMap.atomicCHM[K,V] + } + } + + def newConcurrentSet[E](config : NodeConfig): JSet[E] = { + if (config.useNBHM) { + new NonBlockingHashSet[E] + } else { + new ConcurrentHashSet[E] + } + } + +} + class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) extends Node with ExitListener with SendListener @@ -198,11 +228,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex val tickTime = config.tickTime val poolFactory = config.poolFactory var creation : Int = 0 - val processes = new NonBlockingHashMap[Pid,ProcessAdapter] - val registeredNames = new NonBlockingHashMap[Symbol,Pid] - val channels = AtomicMap.atomicNBHM[Symbol,Channel] - val links = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Link]] - val monitors = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Monitor]] + val processes = ErlangNode.newConcurrentMap[Pid,ProcessAdapter](config) + val registeredNames = ErlangNode.newConcurrentMap[Symbol,Pid](config) + val channels = ErlangNode.newAtomicMap[Symbol,Channel](config) + val links = ErlangNode.newAtomicMap[Channel,JSet[Link]](config) + val monitors = ErlangNode.newAtomicMap[Channel,JSet[Monitor]](config) val pidCount = new AtomicInteger(0) val pidSerial = new AtomicInteger(0) val executor = poolFactory.createActorPool @@ -493,7 +523,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex } } else { getOrConnectAndSend(to.node, LinkMessage(from, to), { channel => - val set = links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]) + val set = links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)) set.add(link) }) } @@ -537,14 +567,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex case Some(p : ProcessAdapter) => val link = p.registerLink(to) if (!isLocal(from)) - links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link) + links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link) case None => if (isLocal(from)) { log.warn("Try to link non-live process %s to %s", from, to) val link = Link(from, to) break(link, 'noproc) } else { - links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to)) + links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to)) } } @@ -552,7 +582,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex case Some(p : ProcessAdapter) => val link = p.registerLink(from) if (!isLocal(to)) - links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link) + links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link) case None => if (isLocal(to)) { @@ -560,7 +590,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex val link = Link(from, to) break(link, 'noproc) } else { - links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to)) + links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to)) } } } @@ -584,7 +614,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex } } else { getOrConnectAndSend(nodeOf(monitored), MonitorMessage(monitoring, monitored, ref), { channel => - val set = monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]) + val set = monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)) set.add(monitor) }) } @@ -611,14 +641,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex log.debug("adding monitor for %s", p) val monitor = p.registerMonitor(monitoring, ref) if (!isLocal(monitored)) - monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(monitor) + monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)).add(monitor) case None => if (isLocal(monitored)) { log.warn("Try to monitor non-live process: %s -> %s (%s)", monitoring, monitored, ref) val monitor = Monitor(monitoring, monitored, ref) monitorExit(monitor, 'noproc) } else { - monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(Monitor(monitoring, monitored, ref)) + monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)).add(Monitor(monitoring, monitored, ref)) } } } diff --git a/src/main/scala/scalang/NodeConfig.scala b/src/main/scala/scalang/NodeConfig.scala index 75cdd1d..dea35da 100644 --- a/src/main/scala/scalang/NodeConfig.scala +++ b/src/main/scala/scalang/NodeConfig.scala @@ -24,7 +24,8 @@ case class NodeConfig( typeFactory : TypeFactory = NoneTypeFactory, typeEncoder: TypeEncoder = NoneTypeEncoder, typeDecoder : TypeDecoder = NoneTypeDecoder, - tickTime : Int = 60) + tickTime : Int = 60, + useNBHM: Boolean = true) object NoneTypeFactory extends TypeFactory { def createType(name : Symbol, arity : Int, reader : TermReader) = None diff --git a/src/main/scala/scalang/util/concurrent/ConcurrentHashSet.scala b/src/main/scala/scalang/util/concurrent/ConcurrentHashSet.scala new file mode 100644 index 0000000..a2b2672 --- /dev/null +++ b/src/main/scala/scalang/util/concurrent/ConcurrentHashSet.scala @@ -0,0 +1,36 @@ +package scalang.util.concurrent; + +import java.util.AbstractSet; +import java.util.concurrent.ConcurrentHashMap; + +class ConcurrentHashSet[E] extends AbstractSet[E] { + + val V = ""; + + val map = new ConcurrentHashMap[E,Object] + + override def add(obj: E) = { + map.putIfAbsent(obj, V) == null; + } + + override def contains(obj: Object) = { + map.containsKey(obj); + } + + override def remove(obj: Object) = { + map.remove(obj) == V; + } + + override def size() = { + map.size(); + } + + override def clear() { + map.clear(); + } + + override def iterator() = { + map.keySet().iterator(); + } + +}