Skip to content

Commit

Permalink
1. Check multipleValuesMap to see if the value could be moved to sing…
Browse files Browse the repository at this point in the history
…leValueMap

during removeValues etc. This will reduce the momery. KesqueCompactor.load() seems working now. #8
2. Only op on offset that > fromOffset during Kesque.readOnce
  • Loading branch information
dcaoyuan committed Mar 28, 2019
1 parent 0420392 commit 14dfe19
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 77 deletions.
23 changes: 6 additions & 17 deletions kesque/src/main/scala/kesque/HashKeyValueTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ final class HashKeyValueTable private[kesque] (
info(s"Loading index of ${topics(col)}")
val start = System.nanoTime

// TODO shift indexTopics and shiftIndexTopics
val indexTopicsOfFileno = Array(indexTopics)
val initCounts = Array.fill[Int](indexTopicsOfFileno.length)(0)
val (_, counts) = indexTopicsOfFileno.foldLeft(0, initCounts) {
case ((fileno, counts), idxTps) =>
Expand Down Expand Up @@ -217,7 +219,7 @@ final class HashKeyValueTable private[kesque] (
val kafkaTopic = topicsOfFileno(fileno)(col) // kafka topic directory name
val (topicPartition, result) = db.read(kafkaTopic, offset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
// NOTE: the records usally do not start from the fecth-offset,
// NOTE: the records usually do not start from the fecth-offset,
// the expected record may be near the tail of recs
//debug(s"======== $offset ${result.info.fetchOffsetMetadata} ")
while (recs.hasNext) {
Expand Down Expand Up @@ -413,22 +415,7 @@ final class HashKeyValueTable private[kesque] (
writeLock.lock()

val col = topicToCol(topic)
val key = Hash(keyBytes)
val hash = key.hashCode
hashOffsets.get(hash, col) match {
case IntIntsMap.NO_VALUE =>
case mixedOffsets =>
var found = false
var i = 0
while (!found && i < mixedOffsets.length) {
if (mixedOffset == mixedOffsets(i)) {
hashOffsets.removeValue(hash, mixedOffset, col)
found = true
} else {
i += 1
}
}
}
hashOffsets.removeValue(Hash(keyBytes).hashCode, mixedOffset, col)
} finally {
writeLock.unlock()
}
Expand Down Expand Up @@ -465,6 +452,8 @@ final class HashKeyValueTable private[kesque] (
}
}

def size = hashOffsets.size

def cacheHitRate(topic: String) = caches(topicToCol(topic)).hitRate
def cacheReadCount(topic: String) = caches(topicToCol(topic)).readCount
def resetCacheHitRate(topic: String) = caches(topicToCol(topic)).resetHitRate()
Expand Down
27 changes: 16 additions & 11 deletions kesque/src/main/scala/kesque/HashOffsets.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ object HashOffsets {

var col = 0
while (col < 3) {
println(s"n is $col")
println(s"\n=== col $col ===")

// put i and -i
var i = -max
while (i <= max) {
Expand Down Expand Up @@ -130,13 +131,17 @@ final class HashOffsets(initSize: Int, nValues: Int = 1, fillFactor: Float = 0.7
try {
writeLock.lock()

multipleValuesMap.get(key, col) match {
case IntIntsMap.NO_VALUE =>
singleValueMap.get(key, col) match {
case IntIntMap.NO_VALUE => IntIntsMap.NO_VALUE
case existedValue => Array(singleValueMap.remove(key, col))
multipleValuesMap.removeValue(key, value, col) match {
case IntIntsMap.NO_VALUE => Array(singleValueMap.remove(key, col))
case removedValue =>
if (removedValue.length > 0) {
val leftValues = multipleValuesMap.get(key, col)
if (leftValues.length == 1) {
singleValueMap.put(key, leftValues(0), col)
multipleValuesMap.remove(key, col)
}
}
case _ => multipleValuesMap.removeValue(key, value, col)
removedValue
}
} finally {
writeLock.unlock()
Expand All @@ -156,8 +161,7 @@ final class HashOffsets(initSize: Int, nValues: Int = 1, fillFactor: Float = 0.7
case IntIntMap.NO_VALUE => Array(singleValueMap.put(key, toPut, col))
case existed =>
singleValueMap.remove(key, col)
multipleValuesMap.put(key, existed, col)
multipleValuesMap.replace(key, toRemove, toPut, col)
Array(singleValueMap.put(key, toPut, col))
}
case _ =>
multipleValuesMap.replace(key, toRemove, toPut, col)
Expand Down Expand Up @@ -189,12 +193,13 @@ final class HashOffsets(initSize: Int, nValues: Int = 1, fillFactor: Float = 0.7
try {
readLock.lock()

multipleValuesMap.removeValues(col)(cond)
// TODO should move value to singleValueMap if multipeValuesMap only has one value?
singleValueMap.removeValues(col)(cond)
multipleValuesMap.removeValues(col)(cond)
} finally {
readLock.unlock()
}
}

def size = singleValueMap.size + multipleValuesMap.size
def size = (singleValueMap.size, multipleValuesMap.size)
}
6 changes: 5 additions & 1 deletion kesque/src/main/scala/kesque/IntIntMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ final class IntIntMap(initSize: Int, nValues: Int, fillFactor: Float = 0.75f) {
var ptr = getStartIndex(key)

if (key == FREE_KEY) {
return if (m_hasFreeKey(col)) m_freeValue(col) else NO_VALUE
return if (m_hasFreeKey(col)) {
m_freeValue(col)
} else {
NO_VALUE
}
}

var k = m_data(ptr)
Expand Down
8 changes: 4 additions & 4 deletions kesque/src/main/scala/kesque/IntIntsMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ object IntIntsMap {
type V = Int // unsigned int could be 2^32 = 4,294,967,296

private val FREE_KEY = 0
/**
* Used for return only.
* Always put null to data[] when removing item instead of NO_VALUE (Array())
/**
* Used for return only.
* Always put null to data[] when removing item instead of NO_VALUE (Array())
* to release memory.
*/
val NO_VALUE = Array[V]()
val NO_VALUE = Array[V]()

// --- simple test
def main(args: Array[String]) {
Expand Down
40 changes: 22 additions & 18 deletions kesque/src/main/scala/kesque/Kesque.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,36 +124,40 @@ final class Kesque(props: Properties) {
var nRead = 0
do {
readOnce(topic, offset, fetchMaxBytes)(op) match {
case (n, o) =>
nRead = n
offset = o + 1
case (count, lastOffset) =>
nRead = count
offset = lastOffset + 1
}
} while (nRead > 0)
}

/**
* @param topic
* @param fetchOffset
* @param op: action applied on (offset, key, value)
* @param from offset
* @param op: action applied on TKeyVal(key, value, offset, timestamp)
* @return (number of read, last offset read)
*/
private[kesque] def readOnce(topic: String, fetchOffset: Long, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
val (topicPartition, result) = read(topic, fetchOffset, fetchMaxBytes).head
private[kesque] def readOnce(topic: String, fromOffset: Long, fetchMaxBytes: Int)(op: TKeyVal => Unit) = {
val (topicPartition, result) = read(topic, fromOffset, fetchMaxBytes).head
val recs = result.info.records.records.iterator
var i = 0
var lastOffset = fetchOffset
var count = 0
var lastOffset = fromOffset
while (recs.hasNext) {
val rec = recs.next
val key = if (rec.hasKey) kesque.getBytes(rec.key) else null
val value = if (rec.hasValue) kesque.getBytes(rec.value) else null
val timestamp = rec.timestamp
val offset = rec.offset.toInt
op(TKeyVal(key, value, offset, timestamp))

lastOffset = offset
i += 1
val offset = rec.offset
if (offset >= fromOffset) {
val key = if (rec.hasKey) kesque.getBytes(rec.key) else null
val value = if (rec.hasValue) kesque.getBytes(rec.value) else null
val timestamp = rec.timestamp

op(TKeyVal(key, value, offset.toInt, timestamp))

lastOffset = offset
count += 1
}
}

(i, lastOffset)
(count, lastOffset)
}

def shutdown() {
Expand Down
58 changes: 32 additions & 26 deletions khipu-eth/src/main/scala/khipu/store/KesqueCompactor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ object KesqueCompactor {
if (nodeCount % 1000 == 0) {
val elapsed = (System.nanoTime - start) / 1000000000
val speed = nodeCount / math.max(1, elapsed)
log.info(s"[comp] $topic nodes $nodeCount $speed/s, at #$blockNumber")
nodeTable.size
log.info(s"[comp] $topic nodes $nodeCount $speed/s, at #$blockNumber, table size ${nodeTable.size}")
}

nodeGot(TKeyVal(key, bytes, mixedOffset, blockNumber))
Expand All @@ -117,41 +118,36 @@ object KesqueCompactor {
final class NodeWriter(topic: String, nodeTable: HashKeyValueTable, toFileNo: Int) {
private val buf = new mutable.ArrayBuffer[TKeyVal]()

private var _maxOffset = Int.MinValue
private var _maxOffset = Long.MinValue
def maxOffset = _maxOffset

/**
* Should flush() after all kv are written.
*/
def write(kv: TKeyVal) {
buf += kv
if (buf.size > 100) { // keep the batched size around 4096 (~ 32*100 bytes)
val kvs = buf map {
case TKeyVal(key, value, mixedOffset, timestamp) =>
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
_maxOffset = math.max(_maxOffset, offset)
TKeyVal(key, value, offset, timestamp)
}
nodeTable.write(kvs, topic, toFileNo)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}

buf.clear()
flush()
}
}

def flush() {
println(s"flush on $topic")
buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}
println(s"flush on $topic, removed index")

val kvs = buf map {
case TKeyVal(key, value, mixedOffset, timestamp) =>
val (_, offset) = HashKeyValueTable.toFileNoAndOffset(mixedOffset)
_maxOffset = math.max(_maxOffset, offset)
TKeyVal(key, value, offset, timestamp)
}
println(s"flush on $topic, writing to $toFileNo")
nodeTable.write(kvs, topic, toFileNo)

buf foreach {
case TKeyVal(key, _, mixedOffset, _) =>
nodeTable.removeIndexEntry(key, mixedOffset, topic)
}
println(s"flush on $topic, writing done")

buf.clear()
}
Expand Down Expand Up @@ -250,12 +246,12 @@ final class KesqueCompactor(
}

def start() {
load()
loadSnaphot
postAppend()
gc()
}

private def load() {
private def loadSnaphot() {
log.info(s"[comp] loading nodes of #$blockNumber")
for {
hash <- blockHeaderStorage.getBlockHash(blockNumber)
Expand All @@ -273,21 +269,31 @@ final class KesqueCompactor(
* should stop world during postAppend()
*/
private def postAppend() {
log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...")
val storageTask = new Thread {
override def run() {
log.info(s"[comp] post append storage from offset ${storageWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
storageTable.iterateOver(storageWriter.maxOffset + 1, KesqueDataSource.storage) {
//storageTable.iterateOver(241714020, KesqueDataSource.storage) {
kv => storageWriter.write(kv)
}
storageWriter.flush()
log.info(s"[comp] post append storage done.")
}
}

log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...")
val accountTask = new Thread {
override def run() {
log.info(s"[comp] post append account from offset ${accountWriter.maxOffset + 1} ...")
// TODO topic from fromFileNo
accountTable.iterateOver(accountWriter.maxOffset + 1, KesqueDataSource.account) {
kv => accountWriter.write(kv)
//accountTable.iterateOver(109535465, KesqueDataSource.account, true) {
kv =>
log.info(s"[comp] account iterate on $kv")
accountWriter.write(kv)
}
accountWriter.flush()
log.info(s"[comp] post append account done.")
}
}

Expand Down

0 comments on commit 14dfe19

Please sign in to comment.