Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

toggle for NonBlockingHashMap #11

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions src/main/scala/scalang/Node.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import java.util.concurrent.atomic._
import org.jboss.{netty => netty}
import netty.channel._
import java.util.concurrent._
import java.util.{Set => JSet}
import scalang.node._
import scalang.util.concurrent.ConcurrentHashSet
import org.jctools.maps._
import overlock.atomicmap._
import org.jetlang._
Expand Down Expand Up @@ -184,6 +186,34 @@ trait Node extends ClusterListener with ClusterPublisher {
def timer : HashedWheelTimer
}

object ErlangNode {

def newConcurrentMap[K,V](config : NodeConfig): ConcurrentMap[K,V] = {
if (config.useNBHM) {
new NonBlockingHashMap[K,V]
} else {
new ConcurrentHashMap[K,V]
}
}

def newAtomicMap[K,V](config : NodeConfig): AtomicMap[K,V] = {
if (config.useNBHM) {
AtomicMap.atomicNBHM[K,V]
} else {
AtomicMap.atomicCHM[K,V]
}
}

def newConcurrentSet[E](config : NodeConfig): JSet[E] = {
if (config.useNBHM) {
new NonBlockingHashSet[E]
} else {
new ConcurrentHashSet[E]
}
}

}

class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) extends Node
with ExitListener
with SendListener
Expand All @@ -198,11 +228,11 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
val tickTime = config.tickTime
val poolFactory = config.poolFactory
var creation : Int = 0
val processes = new NonBlockingHashMap[Pid,ProcessAdapter]
val registeredNames = new NonBlockingHashMap[Symbol,Pid]
val channels = AtomicMap.atomicNBHM[Symbol,Channel]
val links = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Link]]
val monitors = AtomicMap.atomicNBHM[Channel,NonBlockingHashSet[Monitor]]
val processes = ErlangNode.newConcurrentMap[Pid,ProcessAdapter](config)
val registeredNames = ErlangNode.newConcurrentMap[Symbol,Pid](config)
val channels = ErlangNode.newAtomicMap[Symbol,Channel](config)
val links = ErlangNode.newAtomicMap[Channel,JSet[Link]](config)
val monitors = ErlangNode.newAtomicMap[Channel,JSet[Monitor]](config)
val pidCount = new AtomicInteger(0)
val pidSerial = new AtomicInteger(0)
val executor = poolFactory.createActorPool
Expand Down Expand Up @@ -493,7 +523,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
} else {
getOrConnectAndSend(to.node, LinkMessage(from, to), { channel =>
val set = links.getOrElseUpdate(channel, new NonBlockingHashSet[Link])
val set = links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config))
set.add(link)
})
}
Expand Down Expand Up @@ -537,30 +567,30 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
case Some(p : ProcessAdapter) =>
val link = p.registerLink(to)
if (!isLocal(from))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link)
case None =>
if (isLocal(from)) {
log.warn("Try to link non-live process %s to %s", from, to)
val link = Link(from, to)
break(link, 'noproc)
} else {
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to))
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to))
}
}

process(to) match {
case Some(p : ProcessAdapter) =>
val link = p.registerLink(from)
if (!isLocal(to))
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(link)
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(link)

case None =>
if (isLocal(to)) {
log.warn("Try to link non-live process %s to %s", to, from)
val link = Link(from, to)
break(link, 'noproc)
} else {
links.getOrElseUpdate(channel, new NonBlockingHashSet[Link]).add(Link(from, to))
links.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Link](config)).add(Link(from, to))
}
}
}
Expand All @@ -584,7 +614,7 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
}
} else {
getOrConnectAndSend(nodeOf(monitored), MonitorMessage(monitoring, monitored, ref), { channel =>
val set = monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor])
val set = monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config))
set.add(monitor)
})
}
Expand All @@ -611,14 +641,14 @@ class ErlangNode(val name : Symbol, val cookie : String, config : NodeConfig) ex
log.debug("adding monitor for %s", p)
val monitor = p.registerMonitor(monitoring, ref)
if (!isLocal(monitored))
monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(monitor)
monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)).add(monitor)
case None =>
if (isLocal(monitored)) {
log.warn("Try to monitor non-live process: %s -> %s (%s)", monitoring, monitored, ref)
val monitor = Monitor(monitoring, monitored, ref)
monitorExit(monitor, 'noproc)
} else {
monitors.getOrElseUpdate(channel, new NonBlockingHashSet[Monitor]).add(Monitor(monitoring, monitored, ref))
monitors.getOrElseUpdate(channel, ErlangNode.newConcurrentSet[Monitor](config)).add(Monitor(monitoring, monitored, ref))
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/scalang/NodeConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ case class NodeConfig(
typeFactory : TypeFactory = NoneTypeFactory,
typeEncoder: TypeEncoder = NoneTypeEncoder,
typeDecoder : TypeDecoder = NoneTypeDecoder,
tickTime : Int = 60)
tickTime : Int = 60,
useNBHM: Boolean = true)

object NoneTypeFactory extends TypeFactory {
def createType(name : Symbol, arity : Int, reader : TermReader) = None
Expand Down
36 changes: 36 additions & 0 deletions src/main/scala/scalang/util/concurrent/ConcurrentHashSet.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package scalang.util.concurrent;

import java.util.AbstractSet;
import java.util.concurrent.ConcurrentHashMap;

class ConcurrentHashSet[E] extends AbstractSet[E] {

val V = "";

val map = new ConcurrentHashMap[E,Object]

override def add(obj: E) = {
map.putIfAbsent(obj, V) == null;
}

override def contains(obj: Object) = {
map.containsKey(obj);
}

override def remove(obj: Object) = {
map.remove(obj) == V;
}

override def size() = {
map.size();
}

override def clear() {
map.clear();
}

override def iterator() = {
map.keySet().iterator();
}

}