Skip to content

Commit

Permalink
MBbloom-- module support
Browse files Browse the repository at this point in the history
  • Loading branch information
fengyoulin authored and suxb201 committed Oct 7, 2023
1 parent 001c336 commit a194c59
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 0 deletions.
239 changes: 239 additions & 0 deletions internal/rdb/types/mbbloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
package types

import (
"RedisShake/internal/rdb/structure"
"io"
"strconv"
"unsafe"
)

// BloomObject for MBbloom--
type BloomObject struct {
encver int
key string
sb chain
}

type chain struct {
filters []link
size uint64
nfilters uint64
options uint64
growth uint64
}

type link struct {
inner bloom
size uint64
}

type bloom struct {
hashes uint64
n2 uint64
entries uint64
err float64
bpe float64
bf string
bits uint64
}

type dumpedChainHeader struct {
size uint64
nfilters uint32
options uint32
growth uint32
}

type dumpedChainLink struct {
bytes uint64
bits uint64
size uint64
err float64
bpe float64
hashes uint32
entries uint32
enthigh uint32
n2 uint8
}

type dumpedChainHeaderV3 struct {
size uint64
nfilters uint32
options uint32
}

type dumpedChainLinkV3 struct {
bytes uint64
bits uint64
size uint64
err float64
bpe float64
hashes uint32
entries uint32
n2 uint8
}

const (
BF_MIN_OPTIONS_ENC = 2
BF_MIN_GROWTH_ENC = 4
)

const (
DUMPED_CHAIN_HEADER_SIZE = 20
DUMPED_CHAIN_LINK_SIZE = 53
DUMPED_CHAIN_HEADER_SIZE_V3 = 16
DUMPED_CHAIN_LINK_SIZE_V3 = 49
)

const MAX_SCANDUMP_SIZE = 10485760 // 10MB

func (o *BloomObject) LoadFromBuffer(rd io.Reader, key string, typeByte byte) {
o.key = key
var sb chain
sb.size = readUnsigned(rd)
sb.nfilters = readUnsigned(rd)
if o.encver >= BF_MIN_OPTIONS_ENC {
sb.options = readUnsigned(rd)
}
if o.encver >= BF_MIN_GROWTH_ENC {
sb.growth = readUnsigned(rd)
} else {
sb.growth = 2
}
for i := uint64(0); i < sb.nfilters; i++ {
var lb link
bm := &lb.inner
bm.entries = readUnsigned(rd)
bm.err = readDouble(rd)
bm.hashes = readUnsigned(rd)
bm.bpe = readDouble(rd)
if o.encver == 0 {
bm.bits = uint64(float64(bm.entries) * bm.bpe)
} else {
bm.bits = readUnsigned(rd)
bm.n2 = readUnsigned(rd)
}
bm.bf = structure.ReadModuleString(rd)
lb.size = readUnsigned(rd)
sb.filters = append(sb.filters, lb)
}
o.sb = sb
structure.ReadModuleEof(rd)
return
}

func readUnsigned(rd io.Reader) uint64 {
v := structure.ReadModuleUnsigned(rd)
u, err := strconv.ParseUint(v, 10, 64)
if err != nil {
panic(err)
}
return u
}

func readDouble(rd io.Reader) float64 {
v := structure.ReadModuleDouble(rd)
f, err := strconv.ParseFloat(v, 64)
if err != nil {
panic(err)
}
return f
}

func (o *BloomObject) Rewrite() []RedisCmd {
var cs []RedisCmd
var h string
if o.encver < BF_MIN_GROWTH_ENC {
h = getEncodedHeaderV3(&o.sb)
} else {
h = getEncodedHeader(&o.sb)
}
cmd := RedisCmd{"BF.LOADCHUNK", o.key, "1", h}
cs = append(cs, cmd)
curIter := uint64(1)
for {
c := getEncodedChunk(&o.sb, &curIter, MAX_SCANDUMP_SIZE)
if c == "" {
break
}
cmd := RedisCmd{"BF.LOADCHUNK", o.key, strconv.FormatUint(curIter, 10), c}
cs = append(cs, cmd)
}
return cs
}

func getEncodedHeader(sb *chain) string {
h := make([]byte, DUMPED_CHAIN_LINK_SIZE*sb.nfilters+DUMPED_CHAIN_HEADER_SIZE)
ph := (*dumpedChainHeader)(unsafe.Pointer(&h[0]))
ph.size = sb.size
ph.nfilters = uint32(sb.nfilters)
ph.options = uint32(sb.options)
ph.growth = uint32(sb.growth)
for i := uint64(0); i < sb.nfilters; i++ {
pl := (*dumpedChainLink)(unsafe.Add(unsafe.Pointer(&h[0]), DUMPED_CHAIN_HEADER_SIZE+DUMPED_CHAIN_LINK_SIZE*i))
sl := sb.filters[i]
pl.bytes = uint64(len(sl.inner.bf))
pl.bits = sl.inner.bits
pl.size = sl.size
pl.err = sl.inner.err
pl.hashes = uint32(sl.inner.hashes)
pl.bpe = sl.inner.bpe
*(*uint64)(unsafe.Pointer(&pl.entries)) = sl.inner.entries
pl.n2 = uint8(sl.inner.n2)
}
return *(*string)(unsafe.Pointer(&h))
}

func getEncodedHeaderV3(sb *chain) string {
h := make([]byte, DUMPED_CHAIN_LINK_SIZE_V3*sb.nfilters+DUMPED_CHAIN_HEADER_SIZE_V3)
ph := (*dumpedChainHeaderV3)(unsafe.Pointer(&h[0]))
ph.size = sb.size
ph.nfilters = uint32(sb.nfilters)
ph.options = uint32(sb.options)
for i := uint64(0); i < sb.nfilters; i++ {
pl := (*dumpedChainLinkV3)(unsafe.Add(unsafe.Pointer(&h[0]), DUMPED_CHAIN_HEADER_SIZE_V3+DUMPED_CHAIN_LINK_SIZE_V3*i))
sl := sb.filters[i]
pl.bytes = uint64(len(sl.inner.bf))
pl.bits = sl.inner.bits
pl.size = sl.size
pl.err = sl.inner.err
pl.hashes = uint32(sl.inner.hashes)
pl.bpe = sl.inner.bpe
pl.entries = uint32(sl.inner.entries)
pl.n2 = uint8(sl.inner.n2)
}
return *(*string)(unsafe.Pointer(&h))
}

func getEncodedChunk(sb *chain, curIter *uint64, maxChunkSize uint64) string {
pl, off := getLinkPos(sb, *curIter)
if pl == nil {
*curIter = 0
return ""
}
l := maxChunkSize
lr := uint64(len(pl.inner.bf)) - off
if lr < l {
l = lr
}
*curIter += l
return pl.inner.bf[off : off+l]
}

func getLinkPos(sb *chain, curIter uint64) (pl *link, offset uint64) {
curIter--
var seekPos uint64
for i := uint64(0); i < sb.nfilters; i++ {
if seekPos+uint64(len(sb.filters[i].inner.bf)) > curIter {
pl = &sb.filters[i]
break
} else {
seekPos += uint64(len(sb.filters[i].inner.bf))
}
}
if pl == nil {
return
}
offset = curIter - seekPos
return
}
5 changes: 5 additions & 0 deletions internal/rdb/types/module2.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func PareseModuleType(rd io.Reader, key string, typeByte byte) ModuleObject {
o := new(TairZsetObject)
o.LoadFromBuffer(rd, key, typeByte)
return o
case "MBbloom--":
o := new(BloomObject)
o.encver = int(moduleId & 1023)
o.LoadFromBuffer(rd, key, typeByte)
return o
default:
log.Panicf("unsupported module type: %s", moduleName)
return nil
Expand Down

0 comments on commit a194c59

Please sign in to comment.