Skip to content

Commit

Permalink
Lazy wallet utxo sync after broadcasting a tx (#2258)
Browse files Browse the repository at this point in the history
* Lazy wallet utxo sync after broadcasting a tx

* Make a more granular lock for refreshUTXOs

* Don't push to forceSyncChan if it has an element

* Better policy for used outpoints and wait for first sync when creating an unsigned tx

* fix expire condition

* lock address reading

* fix small memory leak

* add an rpc client dedicated for background ops

* rename to follow conventions

* one more rename

* Compare wallet addresses by value

* small fixes

* Add comment

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
someone235 and michaelsutton authored Dec 27, 2023
1 parent 629faa8 commit d2453f8
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 68 deletions.
13 changes: 8 additions & 5 deletions cmd/kaspawallet/daemon/server/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"github.com/pkg/errors"

"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
Expand All @@ -14,13 +15,15 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
s.lock.RLock()
defer s.lock.RUnlock()

if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
}

dagInfo, err := s.rpcClient.GetBlockDAGInfo()
if err != nil {
return nil, err
}
daaScore := dagInfo.VirtualDAAScore
maturity := s.params.BlockCoinbaseMaturity

balancesMap := make(balancesMapType, 0)
for _, entry := range s.utxosSortedByAmount {
amount := entry.UTXOEntry.Amount()
Expand All @@ -30,7 +33,7 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
balances = new(balancesType)
balancesMap[address] = balances
}
if isUTXOSpendable(entry, daaScore, maturity) {
if s.isUTXOSpendable(entry, daaScore) {
balances.available += amount
} else {
balances.pending += amount
Expand Down Expand Up @@ -64,9 +67,9 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
}, nil
}

func isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64, coinbaseMaturity uint64) bool {
func (s *server) isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64) bool {
if !entry.UTXOEntry.IsCoinbase() {
return true
}
return entry.UTXOEntry.BlockDAAScore()+coinbaseMaturity < virtualDAAScore
return entry.UTXOEntry.BlockDAAScore()+s.coinbaseMaturity < virtualDAAScore
}
6 changes: 1 addition & 5 deletions cmd/kaspawallet/daemon/server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro
}
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

s.forceSync()
return txIDs, nil
}

Expand Down
31 changes: 14 additions & 17 deletions cmd/kaspawallet/daemon/server/create_unsigned_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package server
import (
"context"
"fmt"
"time"

"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
"github.com/kaspanet/kaspad/domain/consensus/utils/constants"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
)

// TODO: Implement a better fee estimation mechanism
Expand All @@ -35,24 +33,18 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
}

// make sure address string is correct before proceeding to a
// potentially long UTXO refreshment operation
toAddress, err := util.DecodeAddress(address, s.params.Prefix)
if err != nil {
return nil, err
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

var fromAddresses []*walletAddress
for _, from := range fromAddressesString {
fromAddress, exists := s.addressSet[from]
if !exists {
return nil, fmt.Errorf("Specified from address %s does not exists", from)
return nil, fmt.Errorf("specified from address %s does not exists", from)
}
fromAddresses = append(fromAddresses, fromAddress)
}
Expand Down Expand Up @@ -106,19 +98,14 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
return nil, 0, 0, err
}

coinbaseMaturity := s.params.BlockCoinbaseMaturity
if dagInfo.NetworkName == "kaspa-testnet-11" {
coinbaseMaturity = 1000
}

for _, utxo := range s.utxosSortedByAmount {
if (fromAddresses != nil && !slices.Contains(fromAddresses, utxo.address)) ||
!isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, coinbaseMaturity) {
if (fromAddresses != nil && !walletAddressesContain(fromAddresses, utxo.address)) ||
!s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
continue
}

if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
if time.Since(broadcastTime) > time.Minute {
if s.usedOutpointHasExpired(broadcastTime) {
delete(s.usedOutpoints, *utxo.Outpoint)
} else {
continue
Expand Down Expand Up @@ -160,3 +147,13 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin

return selectedUTXOs, totalReceived, totalValue - totalSpend, nil
}

func walletAddressesContain(addresses []*walletAddress, contain *walletAddress) bool {
for _, address := range addresses {
if *address == *contain {
return true
}
}

return false
}
52 changes: 38 additions & 14 deletions cmd/kaspawallet/daemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package server

import (
"fmt"
"github.com/kaspanet/kaspad/version"
"net"
"os"
"sync"
"sync/atomic"
"time"

"github.com/kaspanet/kaspad/version"

"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"

"github.com/kaspanet/kaspad/util/txmass"
Expand All @@ -28,17 +30,22 @@ import (
type server struct {
pb.UnimplementedKaspawalletdServer

rpcClient *rpcclient.RPCClient
params *dagconfig.Params

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
rpcClient *rpcclient.RPCClient // RPC client for ongoing user requests
backgroundRPCClient *rpcclient.RPCClient // RPC client dedicated for address and UTXO background fetching
params *dagconfig.Params
coinbaseMaturity uint64 // Is different from default if we use testnet-11

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
forceSyncChan chan struct{}
startTimeOfLastCompletedRefresh time.Time
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
firstSyncDone atomic.Bool

isLogFinalProgressLineShown bool
maxUsedAddressesForLog uint32
Expand Down Expand Up @@ -72,6 +79,10 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
if err != nil {
return (errors.Wrapf(err, "Error connecting to RPC server %s", rpcServer))
}
backgroundRPCClient, err := connectToRPC(params, rpcServer, timeout)
if err != nil {
return (errors.Wrapf(err, "Error making a second connection to RPC server %s", rpcServer))
}

log.Infof("Connected, reading keys file %s...", keysFilePath)
keysFile, err := keys.ReadKeysFile(params, keysFilePath)
Expand All @@ -84,13 +95,26 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
return err
}

dagInfo, err := rpcClient.GetBlockDAGInfo()
if err != nil {
return nil
}

coinbaseMaturity := params.BlockCoinbaseMaturity
if dagInfo.NetworkName == "kaspa-testnet-11" {
coinbaseMaturity = 1000
}

serverInstance := &server{
rpcClient: rpcClient,
backgroundRPCClient: backgroundRPCClient,
params: params,
coinbaseMaturity: coinbaseMaturity,
utxosSortedByAmount: []*walletUTXO{},
nextSyncStartIndex: 0,
keysFile: keysFile,
shutdown: make(chan struct{}),
forceSyncChan: make(chan struct{}),
addressSet: make(walletAddressSet),
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
usedOutpoints: map[externalapi.DomainOutpoint]time.Time{},
Expand All @@ -100,8 +124,8 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
}

log.Infof("Read, syncing the wallet...")
spawn("serverInstance.sync", func() {
err := serverInstance.sync()
spawn("serverInstance.syncLoop", func() {
err := serverInstance.syncLoop()
if err != nil {
printErrorAndExit(errors.Wrap(err, "error syncing the wallet"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kaspawallet/daemon/server/split_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *server) moreUTXOsForMergeTransaction(alreadySelectedUTXOs []*libkaspawa
if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok {
continue
}
if !isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, s.params.BlockCoinbaseMaturity) {
if !s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
continue
}
additionalUTXOs = append(additionalUTXOs, &libkaspawallet.UTXO{
Expand Down
Loading

0 comments on commit d2453f8

Please sign in to comment.