English | 简体中文
NutsDB是纯Go语言编写一个简单、高性能、内嵌型、持久化的key-value数据库。
NutsDB支持事务,但目前版本不完全支持ACID的特性(参见限制和警告)。所有的操作都在事务中执行。NutsDB从v0.2.0版本开始支持多种数据结构,如列表(list)、集合(set)、有序集合(sorted set)。
目前项目还在试验阶段状态,没有经过实际生产环境考验。欢迎提issue,贡献!
我想找一个用纯go编写,尽量简单(方便二次开发、研究)、高性能(读写都能快一点)、内嵌型的(减少网络开销)数据库,最好支持事务。因为我觉得对于数据库而言,数据完整性很重要。如果能像Redis一样支持多种数据结构就更好了。 而像Redis一般用作缓存,对于事务支持也很弱。
找到几个备选项:
- BoltDB
BoltDB是一个基于B+ tree,有着非常好的读性能,还支持很实用的特性:范围扫描和按照前缀进行扫描。有很多项目采用了他。虽然现在官方不维护,由etcd团队在维护 他也支持ACID事务,但是他的写性能不是很好。如果对写性能要求不高也值得尝试。
- GoLevelDB
GoLevelDB是google开源的leveldb的go语言版本的实现。他的性能很高,特别是写性能,据官方c++版本说可以到40w+次写/秒,他基于LSM tree实现。他不支持事务。
- Badger
Badger同样是基于LSM tree,不同的是他把key/value分离。据他官网描述是基于为SSD优化。同是他也支持事务。但是我自己测试发现他的写性能没我想象中高。
对于如何实现kv数据库的好奇心吧。数据库可以说是系统的核心,了解数据库的内核或者自己有实现,对更好的用轮子或者下次根据业务定制轮子都很有帮助。
基于以上两点,我决定尝试开发一个简单的kv数据库,性能要好,功能也要强大(至少他们好的功能特性都要继承)。
如上面的选项,我发现大致基于存储引擎的模型分:B+ tree和LSM tree。基于B+ tree的模型相对后者成熟。一般使用覆盖页的方式和WAL(预写日志)来作崩溃恢复。而LSM tree的模型他是先写log文件,然后在写入MemTable内存中,当一定的时候写回SSTable,文件会越来越多,于是他一般作法是在后台进行合并和压缩操作。 一般来说,基于B+ tree的模型写性能不如LSM tree的模型。而在读性能上比LSM tree的模型要来得好。当然LSM tree的模型也可以优化,比如引入BloomFilter。 但是这些模型还是太复杂了。我喜欢简单,简单意味着好实现,好维护,相对不容易出错。
直到我找到bitcask这种模型,他其实本质上也算LSM tree的范畴吧。 他模型非常简单很好理解和实现,很快我就实现了一个版本。但是他的缺点是不支持范围扫描。我尝试去优化他,又开发一个版本,基于B+ tree作为索引,满足了范围扫描的问题 ,读性能是够了,写性能很一般,又用mmap和对原模型作了精简,这样又实现了一版。写性能又提高了几十倍。现在这个版本基本上都实现上面提到的数据库的一些有用的特性,包括支持范围扫描和前缀扫描、包括支持bucket、事务等,还支持了更多的数据结构(list、set、sorted set)。
天下没有银弹,NutsDB也有他的局限,比如随着数据量的增大,索引变大,启动会慢, 还有为了保持高性能的写入,目前版本没有做实时的flush/sync操作,依赖内核的回写操作。只想说NutsDB还有很多优化和提高的空间,由于本人精力以及能力有限。所以把这个项目开源出来。更重要的是我认为一个项目需要有人去使用,有人提意见才会成长。
希望看到这个文档的童鞋有兴趣的,一起来参与贡献,欢迎Star、提issues、提交PR ! 参与贡献
NutsDB的安装很简单,首先保证 Golang 已经安装好 (版本要求1.11以上). 然后在终端执行命令:
go get -u github.com/xujiajun/nutsdb
要打开数据库需要使用 nutsdb.Open()
这个方法。其中用到的选项(options)包括 Dir
, EntryIdxMode
和 SegmentSize
,在调用的时候这些参数必须设置。官方提供了DefaultOptions
的选项,直接使用nutsdb.DefaultOptions
即可。当然你也可以根据需要自己定义。
例子:
package main
import (
"log"
"github.com/xujiajun/nutsdb"
)
func main() {
opt := nutsdb.DefaultOptions
opt.Dir = "/tmp/nutsdb" //这边数据库会自动创建这个目录文件
db, err := nutsdb.Open(opt)
if err != nil {
log.Fatal(err)
}
defer db.Close()
...
}
NutsDB为了保证隔离性,防止并发读写事务时候数据的不一致性,同一时间只能执行一个读写事务,但是允许同一时间执行多个只读事务。 不过目前版本的NutsDB不遵循标准的ACID原则。(参见限制和警告)
err := db.Update(
func(tx *nutsdb.Tx) error {
...
return nil
})
err := db.View(
func(tx *nutsdb.Tx) error {
...
return nil
})
从上面的例子看到 DB.View()
和DB.Update()
这两个是数据库调用事务的主要方法。他们本质上是基于 DB.Begin()
方法进行的包装。他们可以帮你自动管理事务的生命周期,从事务的开始、事务的执行、事务提交或者回滚一直到事务的安全的关闭为止,如果中间有错误会返回。所以一般情况下推荐用这种方式去调用事务。
这好比开车有手动挡和自动挡一样, DB.View()
和DB.Update()
等于提供了自动档的效果。
如果你需要手动去开启、执行、关闭事务,你会用到DB.Begin()
方法开启一个事务,tx.Commit()
方法用来提交事务、tx.Rollback()
方法用来回滚事务
例子:
//开始事务
tx, err := db.Begin(true)
if err != nil {
return err
}
bucket := "bucket1"
key := []byte("foo")
val := []byte("bar")
// 使用事务
if err = tx.Put(bucket, key, val, Persistent); err != nil {
// 回滚事务
tx.Rollback()
} else {
// 提交事务
if err = tx.Commit(); err != nil {
tx.Rollback()
return err
}
}
buckets中文翻译过来是桶的意思,你可以理解成类似mysql的table表的概念,也可以理解成命名空间,或者多租户的概念。 所以你可以用他存不同的key的键值对,也可以存相同的key的键值对。所有的key在一个bucket里面不能重复。
例子:
key := []byte("key001")
val := []byte("val001")
bucket001 := "bucket001"
if err := db.Update(
func(tx *nutsdb.Tx) error {
if err := tx.Put(bucket001, key, val, 0); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
bucket002 := "bucket002"
if err := db.Update(
func(tx *nutsdb.Tx) error {
if err := tx.Put(bucket002, key, val, 0); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
将key-value键值对保存在一个bucket, 你可以使用 tx.Put
这个方法:
- 添加数据
if err := db.Update(
func(tx *nutsdb.Tx) error {
key := []byte("name1")
val := []byte("val1")
bucket: = "bucket1"
if err := tx.Put(bucket, key, val, 0); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
- 更新数据
上面的代码执行之后key为"name1"和value值"val1"被保存在命名为bucket1的bucket里面。
如果你要做更新操作,你可以仍然用tx.Put
方法去执行,比如下面的例子把value的值改成"val1-modify":
if err := db.Update(
func(tx *nutsdb.Tx) error {
key := []byte("name1")
val := []byte("val1-modify") // 更新值
bucket: = "bucket1"
if err := tx.Put(bucket, key, val, 0); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
- 获取数据
获取值可以用tx.Get
这个方法:
if err := db.View(
func(tx *nutsdb.Tx) error {
key := []byte("name1")
bucket: = "bucket1"
if e, err := tx.Get(bucket, key); err != nil {
return err
} else {
fmt.Println(string(e.Value)) // "val1-modify"
}
return nil
}); err != nil {
log.Println(err)
}
- 删除数据
删除使用tx.Delete()
方法:
if err := db.Update(
func(tx *nutsdb.Tx) error {
key := []byte("name1")
bucket: = "bucket1"
if err := tx.Delete(bucket, key); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
NusDB支持TTL(存活时间)的功能,可以对指定的bucket里的key过期时间的设置。使用tx.Put
这个方法的使用ttl
参数就可以了。
如果设置 ttl = 0 或者 Persistent, 这个key就会永久存在。下面例子中ttl设置成 60 , 60s之后key就会过期,在查询的时候将不会被搜到。
if err := db.Update(
func(tx *nutsdb.Tx) error {
key := []byte("name1")
val := []byte("val1")
bucket: = "bucket1"
// 如果设置 ttl = 0 or Persistent, 这个key就会永久不删除
// 这边 ttl = 60 , 60s之后就会过期。
if err := tx.Put(bucket, key, val, 60); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
key在一个bucket里面按照byte-sorted有序排序的,所以对于keys的扫描操作,在NutsDB里是很高效的。
对于前缀的扫描,我们可以用PrefixScan
方法, 使用参数 limitNum
来限制返回的结果的数量,比方下面例子限制100个entries:
if err := db.View(
func(tx *nutsdb.Tx) error {
prefix := []byte("user_")
// 限制 100 entries 返回
if entries, err := tx.PrefixScan(bucket, prefix, 100); err != nil {
return err
} else {
keys, es := nutsdb.SortedEntryKeys(entries)
for _, key := range keys {
fmt.Println(key, string(es[key].Value))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
对于范围的扫描,我们可以用 RangeScan
方法.
例子:
if err := db.View(
func(tx *nutsdb.Tx) error {
// 假设用户key从 user_0000000 to user_9999999.
// 执行区间扫描类似这样一个start和end作为主要参数.
start := []byte("user_0010001")
end := []byte("user_0010010")
bucket:= []byte("user_list)
if entries, err := tx.RangeScan(bucket, start, end); err != nil {
return err
} else {
keys, es := nutsdb.SortedEntryKeys(entries)
for _, key := range keys {
fmt.Println(key, string(es[key].Value))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
随着数据越来越多,特别是一些删除或者过期的数据占据着磁盘,清理这些NutsDB提供了db.Merge()
方法,这个方法需要自己根据实际情况编写合并策略。
一旦执行会影响到正常的写请求,所以最好避开高峰期,比如半夜定时执行等。
err := db.Merge()
if err != nil {
...
}
对于数据库的备份,你可以调用 db.Backup()
方法,只要提供一个备份的文件目录地址即可。这个方法执行的是一个热备份,不会阻塞到数据库其他的只读事务操作,对写(读写)事务会有影响。
err = db.Backup(dir)
if err != nil {
...
}
好了,入门指南已经完结。 散花~,到目前为止都是String类型的数据的crud操作,下面将学习其他更多的数据结构的操作。
看到这边我们将学习其他数据结构,Api命名风格模仿 Redis 命令。所以如果你熟悉Redis,将会很快掌握使用。 其他方面继承了上面的bucket/key/value模型,所以你会看到和Redis的Api使用上稍微有些不同,会多一个bucket。
从指定bucket里面的指定队列key的右边入队一个或者多个元素val。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
val := []byte("val1")
return tx.RPush(bucket, key, val)
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定队列key的左边入队一个或者多个元素val。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
val := []byte("val2")
return tx.LPush(bucket, key, val)
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定队列key的左边出队一个元素,删除并返回。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if item, err := tx.LPop(bucket, key); err != nil {
return err
} else {
fmt.Println("LPop item:", string(item))
}
return nil
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定队列key的左边出队一个元素返回不删除。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if item, err := tx.LPeek(bucket, key); err != nil {
return err
} else {
fmt.Println("LPeek item:", string(item)) //val11
}
return nil
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定队列key的右边出队一个元素,删除并返回。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if item, err := tx.RPop(bucket, key); err != nil {
return err
} else {
fmt.Println("RPop item:", string(item))
}
return nil
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定队列key的右边出队一个元素返回不删除。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if item, err := tx.RPeek(bucket, key); err != nil {
return err
} else {
fmt.Println("RPeek item:", string(item))
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket里面的指定队列key列表里指定范围内的元素。 start 和 end 偏移量都是基于0的下标,即list的第一个元素下标是0(list的表头),第二个元素下标是1,以此类推。 偏移量也可以是负数,表示偏移量是从list尾部开始计数。 例如:-1 表示列表的最后一个元素,-2 是倒数第二个,以此类推。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if items, err := tx.LRange(bucket, key, 0, -1); err != nil {
return err
} else {
//fmt.Println(items)
for _, item := range items {
fmt.Println(string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
从指定bucket里面的指定的key的列表里移除前 count 次出现的值为 value 的元素。 这个 count 参数通过下面几种方式影响这个操作:
count > 0: 从头往尾移除值为 value 的元素。 count < 0: 从尾往头移除值为 value 的元素。 count = 0: 移除所有值为 value 的元素。
下面的例子count=1:
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
return tx.LRem(bucket, key, 1)
}); err != nil {
log.Fatal(err)
}
设置指定bucket的指定list的index位置的的值为value。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if err := tx.LSet(bucket, key, 0, []byte("val11")); err != nil {
return err
} else {
fmt.Println("LSet ok, index 0 item value => val11")
}
return nil
}); err != nil {
log.Fatal(err)
}
修剪一个已存在的 list,这样 list 就会只包含指定范围的指定元素。start 和 stop 都是由0开始计数的, 这里的 0 是列表里的第一个元素(表头),1 是第二个元素,以此类推。
例如: LTRIM foobar 0 2 将会对存储在 foobar 的列表进行修剪,只保留列表里的前3个元素。
start 和 end 也可以用负数来表示与表尾的偏移量,比如 -1 表示列表里的最后一个元素, -2 表示倒数第二个,等等。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
return tx.LTrim(bucket, key, 0, 1)
}); err != nil {
log.Fatal(err)
}
返回指定bucket下指定key列表的size大小
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForList"
key := []byte("myList")
if size,err := tx.LSize(bucket, key); err != nil {
return err
} else {
fmt.Println("myList size is ",size)
}
return nil
}); err != nil {
log.Fatal(err)
}
添加一个指定的member元素到指定bucket的里的指定集合key中。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "bucketForSet"
key := []byte("mySet")
return tx.SAdd(bucket, key, []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
返回多个成员member是否是指定bucket的里的指定集合key的成员。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "bucketForSet"
key := []byte("mySet")
if ok, err := tx.SAreMembers(bucket, key, []byte("a"), []byte("b"), []byte("c")); err != nil {
return err
} else {
fmt.Println("SAreMembers:", ok)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket的指定的集合key的基数 (集合元素的数量)。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "bucketForSet"
key := []byte("mySet")
if num, err := tx.SCard(bucket, key); err != nil {
return err
} else {
fmt.Println("SCard:", num)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回一个集合与给定集合的差集的元素。这两个集合都在一个bucket中。
key1 := []byte("mySet1") // 集合1
key2 := []byte("mySet2") // 集合2
bucket := "bucketForSet"
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket, key1, []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket, key2, []byte("c"), []byte("d"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SDiffByOneBucket(bucket, key1, key2); err != nil {
return err
} else {
fmt.Println("SDiffByOneBucket:", items)
for _, item := range items {
fmt.Println("item", string(item))
}
//item a
//item b
}
return nil
}); err != nil {
log.Fatal(err)
}
返回一个集合与给定集合的差集的元素。这两个集合分别在不同bucket中。
bucket1 := "bucket1"
key1 := []byte("mySet1")
bucket2 := "bucket2"
key2 := []byte("mySet2")
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket1, key1, []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket2, key2, []byte("c"), []byte("d"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SDiffByTwoBuckets(bucket1, key1, bucket2, key2); err != nil {
return err
} else {
fmt.Println("SDiffByTwoBuckets:", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
判断是否指定的集合在指定的bucket中。
bucket := "bucketForSet"
if err := db.View(
func(tx *nutsdb.Tx) error {
if ok, err := tx.SHasKey(bucket, []byte("mySet")); err != nil {
return err
} else {
fmt.Println("SHasKey", ok)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回成员member是否是指定bucket的存指定key集合的成员。
bucket := "bucketForSet"
if err := db.View(
func(tx *nutsdb.Tx) error {
if ok, err := tx.SIsMember(bucket, []byte("mySet"), []byte("a")); err != nil {
return err
} else {
fmt.Println("SIsMember", ok)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket的指定key集合所有的元素。
bucket := "bucketForSet"
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket, []byte("mySet")); err != nil {
return err
} else {
fmt.Println("SMembers", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
将member从source集合移动到destination集合中,其中source集合和destination集合均在一个bucket中。
bucket3 := "bucket3"
if err := db.Update(
func(tx *nutsdb.Tx) error {
return SAdd(bucket3, []byte("mySet1"), []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket3, []byte("mySet2"), []byte("c"), []byte("d"), []byte("e"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
if ok, err := tx.SMoveByOneBucket(bucket3, []byte("mySet1"), []byte("mySet2"), []byte("a")); err != nil {
return err
} else {
fmt.Println("SMoveByOneBucket", ok)
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket3, []byte("mySet1")); err != nil {
return err
} else {
fmt.Println("after SMoveByOneBucket bucket3 mySet1 SMembers", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket3, []byte("mySet2")); err != nil {
return err
} else {
fmt.Println("after SMoveByOneBucket bucket3 mySet2 SMembers", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
将member从source集合移动到destination集合中。其中source集合和destination集合在两个不同的bucket中。
bucket4 := "bucket4"
bucket5 := "bucket5"
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket4, []byte("mySet1"), []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket5, []byte("mySet2"), []byte("c"), []byte("d"), []byte("e"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
if ok, err := tx.SMoveByTwoBuckets(bucket4, []byte("mySet1"), bucket5, []byte("mySet2"), []byte("a")); err != nil {
return err
} else {
fmt.Println("SMoveByTwoBuckets", ok)
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket4, []byte("mySet1")); err != nil {
return err
} else {
fmt.Println("after SMoveByTwoBuckets bucket4 mySet1 SMembers", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket5, []byte("mySet2")); err != nil {
return err
} else {
fmt.Println("after SMoveByTwoBuckets bucket5 mySet2 SMembers", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
从指定bucket里的指定key的集合中移除并返回一个或多个随机元素。
if err := db.Update(
func(tx *nutsdb.Tx) error {
key := []byte("mySet")
if item, err := tx.SPop(bucket, key); err != nil {
return err
} else {
fmt.Println("SPop item from mySet:", string(item))
}
return nil
}); err != nil {
log.Fatal(err)
}
在指定bucket里面移除指定的key集合中移除指定的一个或者多个元素。
bucket6:="bucket6"
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket6, []byte("mySet"), []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
if err := tx.SRem(bucket6, []byte("mySet"), []byte("a")); err != nil {
return err
} else {
fmt.Println("SRem ok")
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SMembers(bucket6, []byte("mySet")); err != nil {
return err
} else {
fmt.Println("SMembers items:", items)
for _, item := range items {
fmt.Println("item:", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定一个bucket里面的给定的两个集合的并集中的所有成员。
bucket7 := "bucket1"
key1 := []byte("mySet1")
key2 := []byte("mySet2")
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket7, key1, []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket7, key2, []byte("c"), []byte("d"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SUnionByOneBucket(bucket7, key1, key2); err != nil {
return err
} else {
fmt.Println("SUnionByOneBucket:", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定两个bucket里面的给定的两个集合的并集中的所有成员。
bucket8 := "bucket1"
key1 := []byte("mySet1")
bucket9 := "bucket2"
key2 := []byte("mySet2")
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket8, key1, []byte("a"), []byte("b"), []byte("c"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
return tx.SAdd(bucket9, key2, []byte("c"), []byte("d"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
if items, err := tx.SUnionByTwoBuckets(bucket8, key1, bucket9, key2); err != nil {
return err
} else {
fmt.Println("SUnionByTwoBucket:", items)
for _, item := range items {
fmt.Println("item", string(item))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
注意:这边的bucket是有序集合名。
将指定成员(包括key、score、value)添加到指定bucket的有序集合(sorted set)里面。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1" // 注意:这边的bucket是有序集合名
key := []byte("key1")
return tx.ZAdd(bucket, key, 1, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
返回指定bucket的的有序集元素个数。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if num, err := tx.ZCard(bucket); err != nil {
return err
} else {
fmt.Println("ZCard num", num)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket的有序集,score值在min和max之间(默认包括score值等于start或end)的成员。
Opts包含的参数:
- Limit int // 限制返回的node数目
- ExcludeStart bool // 排除start
- ExcludeEnd bool // 排除end
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if num, err := tx.ZCount(bucket, 0, 1, nil); err != nil {
return err
} else {
fmt.Println("ZCount num", num)
}
return nil
}); err != nil {
log.Fatal(err)
}
返回一个节点通过指定的bucket有序集合和指定的key来获取。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
key := []byte("key2")
if node, err := tx.ZGetByKey(bucket, key); err != nil {
return err
} else {
fmt.Println("ZGetByKey key2 val:", string(node.Value))
}
return nil
}); err != nil {
log.Fatal(err)
}
返回所有成员通过在指定的bucket。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if nodes, err := tx.ZMembers(bucket); err != nil {
return err
} else {
fmt.Println("ZMembers:", nodes)
for _, node := range nodes {
fmt.Println("member:", node.Key(), string(node.Value))
}
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket有序集合中的具有最高得分的成员。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if node, err := tx.ZPeekMax(bucket); err != nil {
return err
} else {
fmt.Println("ZPeekMax:", string(node.Value))
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket有序集合中的具有最低得分的成员。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if node, err := tx.ZPeekMin(bucket); err != nil {
return err
} else {
fmt.Println("ZPeekMin:", string(node.Value))
}
return nil
}); err != nil {
log.Fatal(err)
}
删除并返回指定bucket有序集合中的具有最高得分的成员。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if node, err := tx.ZPopMax(bucket); err != nil {
return err
} else {
fmt.Println("ZPopMax:", string(node.Value)) //val3
}
return nil
}); err != nil {
log.Fatal(err)
}
删除并返回指定bucket有序集合中的具有最低得分的成员。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet1"
if node, err := tx.ZPopMin(bucket); err != nil {
return err
} else {
fmt.Println("ZPopMin:", string(node.Value)) //val1
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket有序集合的排名start到end的范围(包括start和end)的所有元素。
// ZAdd add items
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet2"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 1, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet2"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 2, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet2"
key3 := []byte("key3")
return tx.ZAdd(bucket, key3, 3, []byte("val3"))
}); err != nil {
log.Fatal(err)
}
// ZRangeByRank
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet2"
if nodes, err := tx.ZRangeByRank(bucket, 1, 2); err != nil {
return err
} else {
fmt.Println("ZRangeByRank nodes :", nodes)
for _, node := range nodes {
fmt.Println("item:", node.Key(), node.Score())
}
//item: key1 1
//item: key2 2
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定bucket有序集合的分数start到end的范围(包括start和end)的所有元素。其中有个Opts
参数用法参考ZCount
。
// ZAdd
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet3"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 70, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet3"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 90, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet3"
key3 := []byte("key3")
return tx.ZAdd(bucket, key3, 86, []byte("val3"))
}); err != nil {
log.Fatal(err)
}
// ZRangeByScore
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet3"
if nodes, err := tx.ZRangeByScore(bucket, 80, 100,nil); err != nil {
return err
} else {
fmt.Println("ZRangeByScore nodes :", nodes)
for _, node := range nodes {
fmt.Println("item:", node.Key(), node.Score())
}
//item: key3 86
//item: key2 90
}
return nil
}); err != nil {
log.Fatal(err)
}
返回有序集bucket中成员指定成员key的排名。其中有序集成员按score值递增(从小到大)顺序排列。注意排名以1为底,也就是说,score值最小的成员排名为1。 这点和Redis不同,Redis是从0开始的。
// ZAdd
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet4"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 70, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet4"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 90, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet4"
key3 := []byte("key3")
return tx.ZAdd(bucket, key3, 86, []byte("val3"))
}); err != nil {
log.Fatal(err)
}
// ZRank
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet4"
key1 := []byte("key1")
if rank, err := tx.ZRank(bucket, key1); err != nil {
return err
} else {
fmt.Println("key1 ZRank :", rank) // key1 ZRank : 1
}
return nil
}); err != nil {
log.Fatal(err)
}
返回有序集bucket中成员指定成员key的反向排名。其中有序集成员还是按score值递增(从小到大)顺序排列。但是获取反向排名,注意排名还是以1为开始,也就是说,但是这个时候score值最大的成员排名为1。
// ZAdd
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet8"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 10, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet8"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 20, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet8"
key3 := []byte("key3")
return tx.ZAdd(bucket, key3, 30, []byte("val3"))
}); err != nil {
log.Fatal(err)
}
// ZRevRank
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet8"
if rank, err := tx.ZRevRank(bucket, []byte("key3")); err != nil {
return err
} else {
fmt.Println("ZRevRank key1 rank:", rank) //ZRevRank key3 rank: 1
}
return nil
}); err != nil {
log.Fatal(err)
}
删除指定成员key在一个指定的有序集合bucket中。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet5"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 10, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet5"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 20, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet5"
if nodes,err := tx.ZMembers(bucket); err != nil {
return err
} else {
fmt.Println("before ZRem key1, ZMembers nodes",nodes)
for _,node:=range nodes {
fmt.Println("item:",node.Key(),node.Score())
}
}
// before ZRem key1, ZMembers nodes map[key1:0xc00008cfa0 key2:0xc00008d090]
// item: key1 10
// item: key2 20
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet5"
if err := tx.ZRem(bucket, "key1"); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet5"
if nodes,err := tx.ZMembers(bucket); err != nil {
return err
} else {
fmt.Println("after ZRem key1, ZMembers nodes",nodes)
for _,node:=range nodes {
fmt.Println("item:",node.Key(),node.Score())
}
// after ZRem key1, ZMembers nodes map[key2:0xc00008d090]
// item: key2 20
}
return nil
}); err != nil {
log.Fatal(err)
}
删除所有成员满足排名start到end(包括start和end)在一个指定的有序集合bucket中。其中排名以1开始,排名1表示第一个节点元素,排名-1表示最后的节点元素。
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
key1 := []byte("key1")
return tx.ZAdd(bucket, key1, 10, []byte("val1"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
key2 := []byte("key2")
return tx.ZAdd(bucket, key2, 20, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
key3 := []byte("key3")
return tx.ZAdd(bucket, key3, 30, []byte("val2"))
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
if nodes,err := tx.ZMembers(bucket); err != nil {
return err
} else {
fmt.Println("before ZRemRangeByRank, ZMembers nodes",nodes)
for _,node:=range nodes {
fmt.Println("item:",node.Key(),node.Score())
}
// before ZRemRangeByRank, ZMembers nodes map[key3:0xc00008d450 key1:0xc00008d270 key2:0xc00008d360]
// item: key1 10
// item: key2 20
// item: key3 30
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.Update(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
if err := tx.ZRemRangeByRank(bucket, 1,2); err != nil {
return err
}
return nil
}); err != nil {
log.Fatal(err)
}
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet6"
if nodes,err := tx.ZMembers(bucket); err != nil {
return err
} else {
fmt.Println("after ZRemRangeByRank, ZMembers nodes",nodes)
for _,node:=range nodes {
fmt.Println("item:",node.Key(),node.Score())
}
// after ZRemRangeByRank, ZMembers nodes map[key3:0xc00008d450]
// item: key3 30
// key1 ZScore 10
}
return nil
}); err != nil {
log.Fatal(err)
}
返回指定有序集bucket中,成员key的score值。
if err := db.View(
func(tx *nutsdb.Tx) error {
bucket := "myZSet7"
if score,err := tx.ZScore(bucket, []byte("key1")); err != nil {
return err
} else {
fmt.Println("ZScore key1 score:",score)
}
return nil
}); err != nil {
log.Fatal(err)
}
BoltDB和NutsDB很相似都是内嵌型的key-value数据库,同时支持事务。Bolt基于B+tree引擎模型,只有一个文件,NutsDB基于bitcask引擎模型,会生成多个文件。当然他们都支持范围扫描和前缀扫描这两个实用的特性。
LevelDB 和 RocksDB 都是基于LSM tree模型.其中LevelDB 不支持事务,不支持bucket。 RocksDB目前还没看到golang实现的版本。
Badger也是基于LSM tree模型。但是写性能没有我想象中高。不支持bucket。
另外,以上数据库均不支持多种数据结构如list、set、sorted set,而NutsDB从0.2.0版本开始支持这些数据结构。
TODO
- 启动索引模式
NutsDB在启动的时候提供了2种索引模式,HintAndRAMIdxMode
和HintAndMemoryMapIdxMode
,默认使用HintAndRAMIdxMode
,在基本的功能的string数据类型(put、get、delete、rangeScan、PrefixScan)这两种模式都支持。HintAndRAMIdxMode
,作为数据库默认选项,他是全内存索引,读写性能都很高。他的瓶颈在于内存。如果你内存够的话,这种默认是适合的。如果你需要存下大于内存的数据,可以使用另一种模式HintAndMemoryMapIdxMode
,他会把value存磁盘,通过索引去找offset,这种模式特别适合value远大于key的场景。他的读性能要比起默认模式要降低不少,具体看自己的要求。关于其他的数据结构(list\set\sorted set)不支持HintAndMemoryMapIdxModee这个模式,只支持默认的HintAndRAMIdxMode,所以如果你要用到其他数据结构如list、set等。请根据需要选模式。
- Segment配置问题
NutsDB会自动切割分成一个个块(Segment),默认SegmentSize
是8MB,这个参数可以自己配置,但是一旦配置不能修改。
- key和value的大小限制问题
关于key和value的大小受到SegmentSize的大小的影响,比如SegmentSize为8M,key和value的大小肯定是小于8M的,不然会返回错误。
在NutsDB里面entry是最小单位,只要保证entry不大于SegmentSize
就可以了。
- entry的大小问题
entry的的大小=EntryHeader的大小+key的大小+value的大小+bucket的大小
- 关于支持的操作系统
目前对Mac OS X 和 Linux 支持,Windows平台暂时不予支持。
- 关于事务说明
在传统的关系式数据库中,常常用 ACID 性质来检验事务功能的安全性,NutsDB目前的版本并没有完全支持ACID。
这这特别感谢 @damnever 给我提的issue给我指出,特别在这说明下,免得误导大家。
目前版本NutsDB支持(A)原子性、C(一致性)、I(隔离性)。但并不保证(D)持久化。以下参考wiki百科的对ACID定义分别讲一下。如讲的有误,欢迎帮我指正。
(预告:一个版本会支持D特性)
1、(A)原子性
所谓原子性,一个事务(transaction)中的所有操作,或者全部完成,或者全部不完成,不会结束在中间某个环节。实现事务的原子性,要支持回滚操作,在某个操作失败后,回滚到事务执行之前的状态。一般的做法是类似数据快照的方案。关于这一点,NutsDB支持回滚操作。NutsDB的作法是先实际预演一边所有要执行的操作,这个时候数据其实还是uncommitted状态,一直到所有环节都没有问题,才会作commit操作,如果中间任何环节一旦发生错误,直接作rollback回滚操作,保证原子性。 就算发生错误的时候已经有数据进磁盘,下次启动也不会被索引到这些数据。
2、(C)一致性
在事务开始之前和事务结束以后,数据库的完整性没有被破坏。这表示写入的数据必须完全符合预期的。NutsDB基于读写锁实现锁机制,在高并发场景下,一个读写事务具有排他性的,比如一个goroutine需要执行一个读写事务,其他不管想要读写的事务或者只读的只能等待,直到这个锁释放为止。保证了数据的一致性。所以这一点NutsDB满足一致性。
3、(I)隔离性
数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以防止多个事务并发执行时由于交叉执行而导致数据的不一致。如上面的一致性所说,NutsDB基于读写锁实现锁机制。不会出现数据串的情况。所以也是满足隔离性的。
关于事务的隔离级别,我们也来对照wiki百科,来看下NutsDB属于哪一个级别:
这个是最低的隔离级别。允许“脏读”(dirty reads),事务可以看到其他事务“尚未提交”的修改。很明显nutsDB是避免脏读的。
定义:这个隔离级别中,基于锁机制并发控制的DBMS需要对选定对象的写锁一直保持到事务结束,但是读锁在SELECT操作完成后马上释放(因此“不可重复读”现象可能会发生)。 看下“不可重复读”的定义:在一次事务中,当一行数据获取两遍得到不同的结果表示发生了“不可重复读”。
nutsDB不会出现“不可重复读”这种情况,当高并发的时候,正在进行读写操作,一个goroutine刚好先拿到只读锁,这个时候要完成一个读写事务操作的那个goroutine要阻塞等到只读锁释放为止。也就避免上面的问题。
定义:这个隔离级别中,基于锁机制并发控制的DBMS需要对选定对象的读锁(read locks)和写锁(write locks)一直保持到事务结束,但不要求“范围锁”,因此可能会发生“幻影读”。
关于幻影读定义,指在事务执行过程中,当两个完全相同的查询语句执行得到不同的结果集。这种现象称为“幻影读(phantom read)”,有些人也叫他幻读,正如上面所说,在nutsDB中,当进行只读操作的时候,同一时间只能并发只读操作,其他有关“写”的事务是被阻塞的,直到这些只读锁释放为止,因此不会出现“幻影读”的情况。
定义:这个隔离级别是最高的。避免了所有上面的“脏读”、不可重复读”、“幻影读”现象。
在nutsDB中,一个只读事务和一个写(读写)事务,是互斥的,需要串行执行,不会出现并发执行。nutsDB属于这个可串行化级别。 这个级别的隔离一般来说在高并发场景下性能会受到影响。但是如果锁本身性能还可以,也不失为一个简单有效的方法。当前版本nutsDB基于读写锁,在并发读多写少的场景下,性能会好一点。
4、(D)持久化
事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。目前版本的nutsdb为了提供高性能的写入,并没有实时的做sync操作,依赖于内核的回写(进程挂掉还是会回写的),你可以理解成作了缓冲,这是一个乐观的做法,这个时候同步工作交给内核,内核会负责将“脏”数据写回磁盘,但这个时间点,应用层没发确定,当积累到一定量(一个Segment大小),做一次munmap操作,将数据做update。这样的作法也就没有满足持久化的特性定义。
关与其他信息待补充。有错误请帮忙指出,提给我issue,谢谢。
👍🎉 首先感谢你能看到这里,参与贡献 🎉👍
参与贡献方式不限于:
- 提各种issues(包括询问问题、提功能建议、性能建议等)
- 提交bug
- 提pull requests
- 优化修改README文档
详情参考英文版的 CONTRIBUTING 。
这个项目受到以下项目或多或少的灵感和帮助:
The NutsDB is open-sourced software licensed under the Apache 2.0 license.