Skip to content

Commit

Permalink
Reimplement script commands against hive script
Browse files Browse the repository at this point in the history
Signed-off-by: Jussi Maki <[email protected]>
  • Loading branch information
joamaki committed Oct 10, 2024
1 parent 70dced9 commit 232e3d0
Show file tree
Hide file tree
Showing 21 changed files with 1,230 additions and 731 deletions.
98 changes: 85 additions & 13 deletions any_table.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package statedb

import (
"fmt"
"iter"
)

Expand All @@ -12,8 +16,13 @@ type AnyTable struct {
}

func (t AnyTable) All(txn ReadTxn) iter.Seq2[any, Revision] {
all, _ := t.AllWatch(txn)
return all
}

func (t AnyTable) AllWatch(txn ReadTxn) (iter.Seq2[any, Revision], <-chan struct{}) {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
return partSeq[any](indexTxn.Iterator())
return partSeq[any](indexTxn.Iterator()), indexTxn.RootWatch()
}

func (t AnyTable) UnmarshalYAML(data []byte) (any, error) {
Expand All @@ -38,22 +47,85 @@ func (t AnyTable) Delete(txn WriteTxn, obj any) (old any, hadOld bool, err error
return
}

func (t AnyTable) Prefix(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter, _ := indexTxn.Prefix([]byte(key))
if indexTxn.unique {
return partSeq[any](iter)
func (t AnyTable) Get(txn ReadTxn, index string, key string) (any, Revision, bool, error) {
itxn, rawKey, err := t.queryIndex(txn, index, key)
if err != nil {
return nil, 0, false, err
}
if itxn.unique {
obj, _, ok := itxn.Get(rawKey)
return obj.data, obj.revision, ok, nil
}
return nonUniqueSeq[any](iter, true, []byte(key))
// For non-unique indexes we need to prefix search and make sure to fully
// match the secondary key.
iter, _ := itxn.Prefix(rawKey)
for {
k, obj, ok := iter.Next()
if !ok {
break
}
secondary, _ := decodeNonUniqueKey(k)
if len(secondary) == len(rawKey) {
return obj.data, obj.revision, true, nil
}
}
return nil, 0, false, nil
}

func (t AnyTable) LowerBound(txn ReadTxn, key string) iter.Seq2[any, Revision] {
indexTxn := txn.getTxn().mustIndexReadTxn(t.Meta, PrimaryIndexPos)
iter := indexTxn.LowerBound([]byte(key))
if indexTxn.unique {
return partSeq[any](iter)
func (t AnyTable) Prefix(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error) {
itxn, rawKey, err := t.queryIndex(txn, index, key)
if err != nil {
return nil, err
}
iter, _ := itxn.Prefix(rawKey)
if itxn.unique {
return partSeq[any](iter), nil
}
return nonUniqueSeq[any](iter, true, rawKey), nil
}

func (t AnyTable) LowerBound(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error) {
itxn, rawKey, err := t.queryIndex(txn, index, key)
if err != nil {
return nil, err
}
iter := itxn.LowerBound(rawKey)
if itxn.unique {
return partSeq[any](iter), nil
}
return nonUniqueLowerBoundSeq[any](iter, rawKey), nil
}

func (t AnyTable) List(txn ReadTxn, index string, key string) (iter.Seq2[any, Revision], error) {
itxn, rawKey, err := t.queryIndex(txn, index, key)
if err != nil {
return nil, err
}
iter, _ := itxn.Prefix(rawKey)
if itxn.unique {
// Unique index means that there can be only a single matching object.
// Doing a Get() is more efficient than constructing an iterator.
value, _, ok := itxn.Get(rawKey)
return func(yield func(any, Revision) bool) {
if ok {
yield(value.data, value.revision)
}
}, nil
}
return nonUniqueSeq[any](iter, false, rawKey), nil
}

func (t AnyTable) queryIndex(txn ReadTxn, index string, key string) (indexReadTxn, []byte, error) {
indexer := t.Meta.getIndexer(index)
if indexer == nil {
return indexReadTxn{}, nil, fmt.Errorf("invalid index %q", index)
}
rawKey, err := indexer.fromString(key)
if err != nil {
return indexReadTxn{}, nil, err
}
return nonUniqueLowerBoundSeq[any](iter, []byte(key))
itxn, err := txn.getTxn().indexReadTxn(t.Meta, indexer.pos)
return itxn, rawKey, err
}

func (t AnyTable) TableHeader() []string {
Expand Down
1 change: 1 addition & 0 deletions cell.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var Cell = cell.Module(

cell.Provide(
newHiveDB,
ScriptCommands,
),
)

Expand Down
24 changes: 14 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,20 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
lockAt := time.Now()
smus.Lock()
acquiredAt := time.Now()

root := *db.root.Load()
tableEntries := make([]*tableEntry, len(root))

txn := &txn{
db: db,
root: root,
handle: db.handleName,
acquiredAt: time.Now(),
writeTxn: writeTxn{
modifiedTables: tableEntries,
smus: smus,
},
}

var tableNames []string
for _, table := range allTables {
tableEntry := root[table.tablePos()]
Expand All @@ -223,26 +234,19 @@ func (db *DB) WriteTxn(table TableMeta, tables ...TableMeta) WriteTxn {
table.Name(),
table.sortableMutex().AcquireDuration(),
)
table.acquired(txn)
}

// Sort the table names so they always appear ordered in metrics.
sort.Strings(tableNames)
txn.tableNames = tableNames

db.metrics.WriteTxnTotalAcquisition(
db.handleName,
tableNames,
acquiredAt.Sub(lockAt),
)

txn := &txn{
db: db,
root: root,
modifiedTables: tableEntries,
smus: smus,
acquiredAt: acquiredAt,
tableNames: tableNames,
handle: db.handleName,
}
runtime.SetFinalizer(txn, txnFinalizer)
return txn
}
Expand Down
43 changes: 33 additions & 10 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"log/slog"
"runtime"
"slices"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -47,26 +49,52 @@ func (t testObject) String() string {
return fmt.Sprintf("testObject{ID: %d, Tags: %v}", t.ID, t.Tags)
}

func (t testObject) TableHeader() []string {
return []string{"ID", "Tags"}
}

func (t testObject) TableRow() []string {
return []string{
strconv.FormatUint(uint64(t.ID), 10),
strings.Join(slices.Collect(t.Tags.All()), ", "),
}
}

var (
idIndex = Index[testObject, uint64]{
Name: "id",
FromObject: func(t testObject) index.KeySet {
return index.NewKeySet(index.Uint64(t.ID))
},
FromKey: index.Uint64,
Unique: true,
FromString: func(key string) (index.Key, error) {
v, err := strconv.ParseUint(key, 10, 64)
return index.Uint64(v), err
},
Unique: true,
}

tagsIndex = Index[testObject, string]{
Name: "tags",
FromObject: func(t testObject) index.KeySet {
return index.Set(t.Tags)
},
FromKey: index.String,
Unique: false,
FromKey: index.String,
FromString: index.FromString,
Unique: false,
}
)

func newTestObjectTable(t testing.TB, name string, secondaryIndexers ...Indexer[testObject]) RWTable[testObject] {
table, err := NewTable(
name,
idIndex,
secondaryIndexers...,
)
require.NoError(t, err, "NewTable[testObject]")
return table
}

const (
INDEX_TAGS = true
NO_INDEX_TAGS = false
Expand All @@ -82,12 +110,7 @@ func newTestDBWithMetrics(t testing.TB, metrics Metrics, secondaryIndexers ...In
var (
db *DB
)
table, err := NewTable(
"test",
idIndex,
secondaryIndexers...,
)
require.NoError(t, err, "NewTable[testObject]")
table := newTestObjectTable(t, "test", secondaryIndexers...)

h := hive.New(
cell.Provide(func() Metrics { return metrics }),
Expand Down Expand Up @@ -237,7 +260,7 @@ func TestDB_Prefix(t *testing.T) {
txn := db.ReadTxn()

iter, watch := table.PrefixWatch(txn, tagsIndex.Query("ab"))
require.Equal(t, Collect(Map(iter, testObject.getID)), []uint64{71, 82})
require.Equal(t, []uint64{71, 82}, Collect(Map(iter, testObject.getID)))

select {
case <-watch:
Expand Down
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ go 1.23
require (
github.com/cilium/hive v0.0.0-20241009102328-2ab688845f23
github.com/cilium/stream v0.0.0-20240209152734-a0792b51812d
github.com/rogpeppe/go-internal v1.11.0
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
go.uber.org/goleak v1.3.0
golang.org/x/term v0.16.0
golang.org/x/time v0.5.0
gopkg.in/yaml.v3 v3.0.1
)
Expand All @@ -34,7 +34,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/term v0.16.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.17.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
Expand Down
4 changes: 4 additions & 0 deletions index/string.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ func String(s string) Key {
return []byte(s)
}

func FromString(s string) (Key, error) {
return String(s), nil
}

func Stringer[T fmt.Stringer](s T) Key {
return String(s.String())
}
Expand Down
41 changes: 41 additions & 0 deletions internal/time.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium

package internal

import (
"fmt"
"time"
)

func PrettySince(t time.Time) string {
return PrettyDuration(time.Since(t))
}

func PrettyDuration(d time.Duration) string {
ago := float64(d) / float64(time.Microsecond)

// micros
if ago < 1000.0 {
return fmt.Sprintf("%.1fus", ago)
}

// millis
ago /= 1000.0
if ago < 1000.0 {
return fmt.Sprintf("%.1fms", ago)
}
// secs
ago /= 1000.0
if ago < 60.0 {
return fmt.Sprintf("%.1fs", ago)
}
// mins
ago /= 60.0
if ago < 60.0 {
return fmt.Sprintf("%.1fm", ago)
}
// hours
ago /= 60.0
return fmt.Sprintf("%.1fh", ago)
}
5 changes: 2 additions & 3 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,12 @@ func nonUniqueSeq[Obj any](iter *part.Iterator[object], prefixSearch bool, searc

secondary, primary := decodeNonUniqueKey(key)

// The secondary key is shorter than what we're looking for, e.g.
// we match into the primary key. Keep searching for matching secondary
// keys.
switch {
case !prefixSearch && len(secondary) != len(searchKey):
// This a List(), thus secondary key must match length exactly.
continue
case prefixSearch && len(secondary) < len(searchKey):
// This is Prefix(), thus key must be equal or longer to search key.
continue
}

Expand Down
Loading

0 comments on commit 232e3d0

Please sign in to comment.