forked from finos/vuu
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
251 changed files
with
6,394 additions
and
3,732 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
76 changes: 76 additions & 0 deletions
76
toolbox/src/main/scala/org/finos/toolbox/CoalescingPriorityQueueImpl.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
package org.finos.toolbox | ||
|
||
import java.util.PriorityQueue | ||
|
||
class CoalescingPriorityQueueImpl[VALUE <: AnyRef, KEY](fn: VALUE => KEY, merge: (VALUE, VALUE) => VALUE, compareEqFun: (KEY, KEY) => Int) extends CoalescingQueue[VALUE, KEY] { | ||
|
||
case class PrioritizedItem[KEY](KEY: KEY, highPriority: Boolean) | ||
|
||
private val keysInOrder = new PriorityQueue[PrioritizedItem[KEY]]((o1: PrioritizedItem[KEY], o2: PrioritizedItem[KEY]) => { | ||
if (o1.highPriority && !o2.highPriority) { | ||
-1 | ||
} else if (!o1.highPriority && o2.highPriority) { | ||
1 | ||
} else { | ||
compareEqFun(o1.KEY, o2.KEY) | ||
} | ||
}) | ||
|
||
private val values = new java.util.HashMap[KEY, VALUE]() | ||
private val lock = new Object | ||
|
||
def length: Int = keysInOrder.size | ||
|
||
def push(item: VALUE) = { | ||
lock.synchronized { | ||
enqueue(fn(item), item, false) | ||
} | ||
} | ||
|
||
override def pushHighPriority(item: VALUE): Unit = { | ||
lock.synchronized { | ||
enqueue(fn(item), item, true) | ||
} | ||
} | ||
|
||
private def enqueue(key: KEY, value: VALUE, highPriority: Boolean) = { | ||
lock.synchronized { | ||
values.get(key) match { | ||
case null => | ||
keysInOrder.add(PrioritizedItem(key, highPriority)) | ||
values.put(key, value) | ||
case old => | ||
val merged = merge(old, value) | ||
if (merged ne old) { | ||
values.put(key, merged) | ||
} | ||
} | ||
} | ||
} | ||
|
||
def isEmpty() = lock.synchronized { | ||
values.isEmpty | ||
} | ||
|
||
private def dequeue: VALUE = { | ||
lock.synchronized { | ||
val key = keysInOrder.poll() | ||
values.remove(key.KEY) | ||
} | ||
} | ||
|
||
|
||
def popUpTo(i: Int): Seq[VALUE] = { | ||
lock.synchronized { | ||
val entries = for (i <- 0 until i if keysInOrder.size > 0) yield keysInOrder.poll() | ||
entries.map(x => values.remove(x.KEY)) | ||
} | ||
} | ||
|
||
def pop(): VALUE = dequeue | ||
|
||
def popOption(): Option[VALUE] = { | ||
val v = dequeue | ||
Option(v) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
66 changes: 66 additions & 0 deletions
66
toolbox/src/test/scala/org/finos/toolbox/CoalescingPriorityQueueTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package org.finos.toolbox | ||
|
||
import org.scalatest.GivenWhenThen | ||
import org.scalatest.featurespec.AnyFeatureSpec | ||
import org.scalatest.matchers.should.Matchers | ||
|
||
class CoalescingPriorityQueueTest extends AnyFeatureSpec with Matchers with GivenWhenThen{ | ||
|
||
Feature("check prioritized queue") { | ||
|
||
Scenario("check queue") { | ||
|
||
case class QueueEntry(int: Integer, text: String) | ||
|
||
val queue = new CoalescingPriorityQueueImpl[QueueEntry, Integer](e => e.int, (x, y) => y, (key1, key2) => { | ||
key1.compareTo(key2) | ||
}) | ||
|
||
queue.push(QueueEntry(0, "foo")) | ||
queue.push(QueueEntry(1, "bar")) | ||
queue.pushHighPriority(QueueEntry(2, "ping")) | ||
queue.pushHighPriority(QueueEntry(3, "pong")) | ||
|
||
queue.pop().int should equal(2) | ||
queue.pop().int should equal(3) | ||
queue.pop().int should equal(0) | ||
queue.pop().int should equal(1) | ||
|
||
queue.push(QueueEntry(4, "foo")) | ||
queue.push(QueueEntry(5, "bar")) | ||
queue.pushHighPriority(QueueEntry(6, "ping")) | ||
|
||
val dequeued = queue.popUpTo(10) | ||
|
||
dequeued.size should be(3) | ||
dequeued(0).int should equal(6) | ||
dequeued(1).int should equal(4) | ||
dequeued(2).int should equal(5) | ||
|
||
queue.pushHighPriority(QueueEntry(7, "ping")) | ||
queue.pushHighPriority(QueueEntry(8, "ping")) | ||
queue.pushHighPriority(QueueEntry(9, "ping")) | ||
queue.pushHighPriority(QueueEntry(10, "ping")) | ||
|
||
val dequeued2 = queue.popUpTo(10) | ||
|
||
dequeued2.size should be(4) | ||
dequeued2(0).int should equal(7) | ||
dequeued2(1).int should equal(8) | ||
dequeued2(2).int should equal(9) | ||
dequeued2(3).int should equal(10) | ||
|
||
queue.push(QueueEntry(11, "foo")) | ||
queue.push(QueueEntry(12, "bar")) | ||
queue.push(QueueEntry(13, "bar")) | ||
|
||
val dequeued3 = queue.popUpTo(10) | ||
|
||
dequeued3.size should be(3) | ||
dequeued3(0).int should equal(11) | ||
dequeued3(1).int should equal(12) | ||
dequeued3(2).int should equal(13) | ||
} | ||
} | ||
|
||
} |
Oops, something went wrong.