Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy wallet utxo sync after broadcasting a tx #2258

Merged
merged 13 commits into from
Dec 27, 2023
13 changes: 8 additions & 5 deletions cmd/kaspawallet/daemon/server/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package server

import (
"context"
"github.com/pkg/errors"

"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
Expand All @@ -14,13 +15,15 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
s.lock.RLock()
defer s.lock.RUnlock()

if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
}

dagInfo, err := s.rpcClient.GetBlockDAGInfo()
if err != nil {
return nil, err
}
daaScore := dagInfo.VirtualDAAScore
maturity := s.params.BlockCoinbaseMaturity

balancesMap := make(balancesMapType, 0)
for _, entry := range s.utxosSortedByAmount {
amount := entry.UTXOEntry.Amount()
Expand All @@ -30,7 +33,7 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
balances = new(balancesType)
balancesMap[address] = balances
}
if isUTXOSpendable(entry, daaScore, maturity) {
if s.isUTXOSpendable(entry, daaScore) {
balances.available += amount
} else {
balances.pending += amount
Expand Down Expand Up @@ -64,9 +67,9 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get
}, nil
}

func isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64, coinbaseMaturity uint64) bool {
func (s *server) isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64) bool {
if !entry.UTXOEntry.IsCoinbase() {
return true
}
return entry.UTXOEntry.BlockDAAScore()+coinbaseMaturity < virtualDAAScore
return entry.UTXOEntry.BlockDAAScore()+s.coinbaseMaturity < virtualDAAScore
}
6 changes: 1 addition & 5 deletions cmd/kaspawallet/daemon/server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro
}
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

s.forceSync()
return txIDs, nil
}

Expand Down
31 changes: 14 additions & 17 deletions cmd/kaspawallet/daemon/server/create_unsigned_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@ package server
import (
"context"
"fmt"
"time"

"github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb"
"github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet"
"github.com/kaspanet/kaspad/domain/consensus/utils/constants"
"github.com/kaspanet/kaspad/util"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
)

// TODO: Implement a better fee estimation mechanism
Expand All @@ -35,24 +33,18 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
}

// make sure address string is correct before proceeding to a
// potentially long UTXO refreshment operation
toAddress, err := util.DecodeAddress(address, s.params.Prefix)
if err != nil {
return nil, err
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

var fromAddresses []*walletAddress
for _, from := range fromAddressesString {
fromAddress, exists := s.addressSet[from]
if !exists {
return nil, fmt.Errorf("Specified from address %s does not exists", from)
return nil, fmt.Errorf("specified from address %s does not exists", from)
}
fromAddresses = append(fromAddresses, fromAddress)
}
Expand Down Expand Up @@ -106,19 +98,14 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
return nil, 0, 0, err
}

coinbaseMaturity := s.params.BlockCoinbaseMaturity
if dagInfo.NetworkName == "kaspa-testnet-11" {
coinbaseMaturity = 1000
}

for _, utxo := range s.utxosSortedByAmount {
if (fromAddresses != nil && !slices.Contains(fromAddresses, utxo.address)) ||
!isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, coinbaseMaturity) {
if (fromAddresses != nil && !walletAddressesContain(fromAddresses, utxo.address)) ||
!s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
continue
}

if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
if time.Since(broadcastTime) > time.Minute {
if s.usedOutpointHasExpired(broadcastTime) {
delete(s.usedOutpoints, *utxo.Outpoint)
} else {
continue
Expand Down Expand Up @@ -160,3 +147,13 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin

return selectedUTXOs, totalReceived, totalValue - totalSpend, nil
}

func walletAddressesContain(addresses []*walletAddress, contain *walletAddress) bool {
for _, address := range addresses {
if *address == *contain {
return true
}
}

return false
}
52 changes: 38 additions & 14 deletions cmd/kaspawallet/daemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package server

import (
"fmt"
"github.com/kaspanet/kaspad/version"
"net"
"os"
"sync"
"sync/atomic"
"time"

"github.com/kaspanet/kaspad/version"

"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"

"github.com/kaspanet/kaspad/util/txmass"
Expand All @@ -28,17 +30,22 @@ import (
type server struct {
pb.UnimplementedKaspawalletdServer

rpcClient *rpcclient.RPCClient
params *dagconfig.Params

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
rpcClient *rpcclient.RPCClient // RPC client for ongoing user requests
backgroundRPCClient *rpcclient.RPCClient // RPC client dedicated for address and UTXO background fetching
params *dagconfig.Params
coinbaseMaturity uint64 // Is different from default if we use testnet-11

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
forceSyncChan chan struct{}
startTimeOfLastCompletedRefresh time.Time
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
firstSyncDone atomic.Bool

isLogFinalProgressLineShown bool
maxUsedAddressesForLog uint32
Expand Down Expand Up @@ -72,6 +79,10 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
if err != nil {
return (errors.Wrapf(err, "Error connecting to RPC server %s", rpcServer))
}
backgroundRPCClient, err := connectToRPC(params, rpcServer, timeout)
if err != nil {
return (errors.Wrapf(err, "Error making a second connection to RPC server %s", rpcServer))
}

log.Infof("Connected, reading keys file %s...", keysFilePath)
keysFile, err := keys.ReadKeysFile(params, keysFilePath)
Expand All @@ -84,13 +95,26 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
return err
}

dagInfo, err := rpcClient.GetBlockDAGInfo()
if err != nil {
return nil
}

coinbaseMaturity := params.BlockCoinbaseMaturity
if dagInfo.NetworkName == "kaspa-testnet-11" {
coinbaseMaturity = 1000
}

serverInstance := &server{
rpcClient: rpcClient,
backgroundRPCClient: backgroundRPCClient,
params: params,
coinbaseMaturity: coinbaseMaturity,
utxosSortedByAmount: []*walletUTXO{},
nextSyncStartIndex: 0,
keysFile: keysFile,
shutdown: make(chan struct{}),
forceSyncChan: make(chan struct{}),
addressSet: make(walletAddressSet),
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
usedOutpoints: map[externalapi.DomainOutpoint]time.Time{},
Expand All @@ -100,8 +124,8 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
}

log.Infof("Read, syncing the wallet...")
spawn("serverInstance.sync", func() {
err := serverInstance.sync()
spawn("serverInstance.syncLoop", func() {
err := serverInstance.syncLoop()
if err != nil {
printErrorAndExit(errors.Wrap(err, "error syncing the wallet"))
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kaspawallet/daemon/server/split_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func (s *server) moreUTXOsForMergeTransaction(alreadySelectedUTXOs []*libkaspawa
if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok {
continue
}
if !isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, s.params.BlockCoinbaseMaturity) {
if !s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) {
continue
}
additionalUTXOs = append(additionalUTXOs, &libkaspawallet.UTXO{
Expand Down
Loading
Loading