Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sector root caching #136

Merged
merged 11 commits into from
Aug 4, 2023
9 changes: 7 additions & 2 deletions host/contracts/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -150,8 +151,9 @@ type (
store ContractStore
log *zap.Logger

once sync.Once
done func() // done is called when the updater is closed.
rootsCache *lru.TwoQueueCache[types.FileContractID, []types.Hash256] // reference to the cache in the contract manager
once sync.Once
done func() // done is called when the updater is closed.

sectors uint64
contractID types.FileContractID
Expand Down Expand Up @@ -333,11 +335,14 @@ func (cu *ContractUpdater) Commit(revision SignedRevision, usage Usage) error {
}

start := time.Now()
// revise the contract
err := cu.store.ReviseContract(revision, usage, cu.sectors, cu.sectorActions)
if err == nil {
// clear the committed sector actions
cu.sectorActions = cu.sectorActions[:0]
}
// update the roots cache
cu.rootsCache.Add(revision.Revision.ParentID, cu.sectorRoots[:])
cu.log.Debug("contract update committed", zap.String("contractID", revision.Revision.ParentID.String()), zap.Uint64("revision", revision.Revision.RevisionNumber), zap.Duration("elapsed", time.Since(start)))
return err
}
170 changes: 170 additions & 0 deletions host/contracts/contracts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package contracts_test

import (
"path/filepath"
"testing"

rhp2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/hostd/host/alerts"
"go.sia.tech/hostd/host/contracts"
"go.sia.tech/hostd/host/storage"
"go.sia.tech/hostd/internal/test"
"go.sia.tech/hostd/persist/sqlite"
"go.uber.org/zap/zaptest"
"lukechampine.com/frand"
)

func TestContractUpdater(t *testing.T) {
const sectors = 256
hostKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))
renterKey := types.NewPrivateKeyFromSeed(frand.Bytes(32))
dir := t.TempDir()

log := zaptest.NewLogger(t)
db, err := sqlite.OpenDatabase(filepath.Join(dir, "hostd.db"), log.Named("sqlite"))
if err != nil {
t.Fatal(err)
}
defer db.Close()

node, err := test.NewWallet(hostKey, t.TempDir(), log.Named("wallet"))
if err != nil {
t.Fatal(err)
}
defer node.Close()

am := alerts.NewManager()
s, err := storage.NewVolumeManager(db, am, node.ChainManager(), log.Named("storage"), sectorCacheSize)
if err != nil {
t.Fatal(err)
}
defer s.Close()

// create a fake volume so disk space is not used
id, err := db.AddVolume("test", false)
if err != nil {
t.Fatal(err)
} else if err := db.GrowVolume(id, sectors); err != nil {
t.Fatal(err)
} else if err := db.SetAvailable(id, true); err != nil {
t.Fatal(err)
}

c, err := contracts.NewManager(db, am, s, node.ChainManager(), node.TPool(), node, log.Named("contracts"))
if err != nil {
t.Fatal(err)
}
defer c.Close()

contractUnlockConditions := types.UnlockConditions{
PublicKeys: []types.UnlockKey{
renterKey.PublicKey().UnlockKey(),
hostKey.PublicKey().UnlockKey(),
},
SignaturesRequired: 2,
}
rev := contracts.SignedRevision{
Revision: types.FileContractRevision{
FileContract: types.FileContract{
UnlockHash: types.Hash256(contractUnlockConditions.UnlockHash()),
WindowStart: 100,
WindowEnd: 200,
},
ParentID: frand.Entropy256(),
UnlockConditions: contractUnlockConditions,
},
}

if err := c.AddContract(rev, []types.Transaction{}, types.ZeroCurrency, contracts.Usage{}); err != nil {
t.Fatal(err)
}

var roots []types.Hash256

tests := []struct {
name string
append int
swap [][2]uint64
trim uint64
}{
{
name: "single root",
append: 1,
},
{
name: "multiple roots",
append: 100,
},
{
name: "swap roots",
swap: [][2]uint64{{0, 1}, {2, 3}, {4, 5}, {0, 100}},
},
{
name: "trim roots",
trim: 3,
},
{
name: "append, swap, trim",
append: 1,
swap: [][2]uint64{{50, 68}},
trim: 10,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
updater, err := c.ReviseContract(rev.Revision.ParentID)
if err != nil {
t.Fatal(err)
}
defer updater.Close()

for i := 0; i < test.append; i++ {
root := frand.Entropy256()
release, err := db.StoreSector(root, func(loc storage.SectorLocation, exists bool) error { return nil })
if err != nil {
t.Fatal(err)
}
defer release()
updater.AppendSector(root)
roots = append(roots, root)
}

for _, swap := range test.swap {
if err := updater.SwapSectors(swap[0], swap[1]); err != nil {
t.Fatal(err)
}
roots[swap[0]], roots[swap[1]] = roots[swap[1]], roots[swap[0]]
}

if err := updater.TrimSectors(test.trim); err != nil {
t.Fatal(err)
}
roots = roots[:len(roots)-int(test.trim)]

if updater.MerkleRoot() != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root")
} else if err := updater.Commit(rev, contracts.Usage{}); err != nil {
t.Fatal(err)
} else if err := updater.Close(); err != nil {
t.Fatal(err)
}

// check that the sector roots are correct in the database
dbRoots, err := db.SectorRoots(rev.Revision.ParentID)
if err != nil {
t.Fatal(err)
} else if rhp2.MetaRoot(dbRoots) != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root in database")
}
// check that the cache sector roots are correct
cachedRoots, err := c.SectorRoots(rev.Revision.ParentID, 0, 0)
if err != nil {
t.Fatal(err)
} else if rhp2.MetaRoot(cachedRoots) != rhp2.MetaRoot(roots) {
t.Fatal("wrong merkle root in cache")
}
})
}
}
2 changes: 1 addition & 1 deletion host/contracts/integrity.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (cm *ContractManager) CheckIntegrity(ctx context.Context, contractID types.

expectedRoots := contract.Revision.Filesize / rhpv2.SectorSize

roots, err := cm.store.SectorRoots(contractID, 0, 0)
roots, err := cm.getSectorRoots(contractID, 0, 0)
if err != nil {
return nil, 0, fmt.Errorf("failed to get sector roots: %w", err)
} else if uint64(len(roots)) != expectedRoots {
Expand Down
54 changes: 51 additions & 3 deletions host/contracts/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"gitlab.com/NebulousLabs/encoding"
"go.sia.tech/core/consensus"
rhpv2 "go.sia.tech/core/rhp/v2"
Expand All @@ -21,6 +22,12 @@ import (
"go.uber.org/zap"
)

// sectorRootCacheSize is the number of contracts' sector roots to cache.
// Caching prevents frequently updated contracts from continuously hitting the
// DB. This is left as a hard-coded small value to limit memory usage since
// contracts can contain any number of sector roots
const sectorRootCacheSize = 30

type (
contractChange struct {
id types.FileContractID
Expand Down Expand Up @@ -82,11 +89,45 @@ type (

processQueue chan uint64 // signals that the contract manager should process actions for a given block height

// caches the sector roots of contracts to avoid hitting the DB
// for frequently accessed contracts. The cache is limited to a
// small number of contracts to limit memory usage.
rootsCache *lru.TwoQueueCache[types.FileContractID, []types.Hash256]

mu sync.Mutex // guards the following fields
locks map[types.FileContractID]*locker // contracts must be locked while they are being modified
}
)

func (cm *ContractManager) getSectorRoots(id types.FileContractID, limit, offset int) ([]types.Hash256, error) {
// check the cache first
if roots, ok := cm.rootsCache.Get(id); ok {
if limit == 0 {
limit = len(roots)
}

if offset+limit > len(roots) {
return nil, errors.New("offset + limit exceeds length of roots")
}

// copy the roots into a new slice to avoid returning a slice of the
// cache's internal array
r := make([]types.Hash256, limit)
copy(r, roots[offset:offset+limit])
return r, nil
}

// if the cache doesn't have the roots, read them from the store
roots, err := cm.store.SectorRoots(id)
if err != nil {
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}
// add the roots to the cache
cm.rootsCache.Add(id, roots)
// return the requested roots
return roots[offset : offset+limit], nil
}

// Lock locks a contract for modification.
func (cm *ContractManager) Lock(ctx context.Context, id types.FileContractID) (SignedRevision, error) {
ctx, cancel, err := cm.tg.AddContext(ctx)
Expand Down Expand Up @@ -201,14 +242,14 @@ func (cm *ContractManager) RenewContract(renewal SignedRevision, existing Signed
}

// SectorRoots returns the roots of all sectors stored by the contract.
func (cm *ContractManager) SectorRoots(id types.FileContractID, limit, offset uint64) ([]types.Hash256, error) {
func (cm *ContractManager) SectorRoots(id types.FileContractID, limit, offset int) ([]types.Hash256, error) {
done, err := cm.tg.Add()
if err != nil {
return nil, err
}
defer done()

return cm.store.SectorRoots(id, limit, offset)
return cm.getSectorRoots(id, limit, offset)
}

// ProcessConsensusChange applies a block update to the contract manager.
Expand Down Expand Up @@ -413,14 +454,15 @@ func (cm *ContractManager) ReviseContract(contractID types.FileContractID) (*Con
return nil, err
}

roots, err := cm.store.SectorRoots(contractID, 0, 0)
roots, err := cm.getSectorRoots(contractID, 0, 0)
if err != nil {
return nil, fmt.Errorf("failed to get sector roots: %w", err)
}
return &ContractUpdater{
store: cm.store,
log: cm.log.Named("contractUpdater"),

rootsCache: cm.rootsCache,
contractID: contractID,
sectors: uint64(len(roots)),
sectorRoots: roots,
Expand Down Expand Up @@ -460,6 +502,10 @@ func convertToCore(siad encoding.SiaMarshaler, core types.DecoderFrom) {

// NewManager creates a new contract manager.
func NewManager(store ContractStore, alerts Alerts, storage StorageManager, c ChainManager, tpool TransactionPool, wallet Wallet, log *zap.Logger) (*ContractManager, error) {
cache, err := lru.New2Q[types.FileContractID, []types.Hash256](sectorRootCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create cache: %w", err)
}
cm := &ContractManager{
store: store,
tg: threadgroup.New(),
Expand All @@ -470,6 +516,8 @@ func NewManager(store ContractStore, alerts Alerts, storage StorageManager, c Ch
tpool: tpool,
wallet: wallet,

rootsCache: cache,

processQueue: make(chan uint64, 100),
locks: make(map[types.FileContractID]*locker),
}
Expand Down
2 changes: 1 addition & 1 deletion host/contracts/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type (
RenewContract(renewal SignedRevision, existing SignedRevision, formationSet []types.Transaction, lockedCollateral types.Currency, clearingUsage, initialUsage Usage, negotationHeight uint64) error
// SectorRoots returns the sector roots for a contract. If limit is 0, all roots
// are returned.
SectorRoots(id types.FileContractID, limit, offset uint64) ([]types.Hash256, error)
SectorRoots(id types.FileContractID) ([]types.Hash256, error)
// ContractAction calls contractFn on every contract in the store that
// needs a lifecycle action performed.
ContractAction(height uint64, contractFn func(types.FileContractID, uint64, string)) error
Expand Down
4 changes: 4 additions & 0 deletions host/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,10 @@ func (vm *VolumeManager) Write(root types.Hash256, data *[rhpv2.SectorSize]byte)
// AddTemporarySectors adds sectors to the temporary store. The sectors are not
// referenced by a contract and will be removed at the expiration height.
func (vm *VolumeManager) AddTemporarySectors(sectors []TempSector) error {
if len(sectors) == 0 {
return nil
}

done, err := vm.tg.Add()
if err != nil {
return err
Expand Down
Loading