Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work to add an in-memory layer on top of the TxStore #11190

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c53bb4c
initial commit for inmemory implementation
poopoothegorilla Oct 31, 2023
b75d260
some cleanup
poopoothegorilla Nov 1, 2023
41a572d
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Nov 1, 2023
3dde323
fix tests
poopoothegorilla Nov 2, 2023
201a2dd
add some functions for the in memory store
poopoothegorilla Nov 6, 2023
0c1d6cf
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Nov 6, 2023
b565d45
implement a few more methods for the in memory store
poopoothegorilla Nov 6, 2023
faf670f
some clean up
poopoothegorilla Nov 7, 2023
178b0f0
add check for inprogress txn
poopoothegorilla Nov 7, 2023
fe25a26
make some changes to testing
poopoothegorilla Nov 7, 2023
a0ac2a3
reorganize tests
poopoothegorilla Nov 7, 2023
a28aa80
start initialization function; add priority queue system; change stor…
poopoothegorilla Nov 8, 2023
e2f3a87
add initialization to address state
poopoothegorilla Nov 14, 2023
91bea75
change loops to not use range
poopoothegorilla Nov 14, 2023
5314284
clean up idempotency key location
poopoothegorilla Nov 15, 2023
5a908be
add find latest sequence
poopoothegorilla Nov 15, 2023
9872283
remove FindLatestSequence from persistent interface
poopoothegorilla Nov 15, 2023
df5e946
add UnconfirmedTransactions to TxSoreWebApi
poopoothegorilla Nov 15, 2023
fcb9021
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Dec 11, 2023
5d78868
address comments
poopoothegorilla Dec 11, 2023
a0a1a79
fix tests
poopoothegorilla Dec 11, 2023
89d1056
fix bug in tests
poopoothegorilla Dec 11, 2023
3663bd0
run go generate
poopoothegorilla Dec 12, 2023
17cd10d
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Dec 12, 2023
54b7ecc
address comment
poopoothegorilla Dec 12, 2023
f8e0175
update inmemory store to implement txstore interface
poopoothegorilla Dec 12, 2023
df1a11a
cleanup tests
poopoothegorilla Dec 12, 2023
5452d02
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Dec 12, 2023
5f0f6da
Merge branch 'develop' into jtw/initial-in-memory-work
poopoothegorilla Dec 14, 2023
f24eea7
add deepcopy
poopoothegorilla Dec 14, 2023
325fb12
remove testing since this will be in future work
poopoothegorilla Jan 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 307 additions & 0 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,307 @@
package txmgr

import (
"context"
"fmt"
"sync"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
"gopkg.in/guregu/null.v4"
)

// AddressState is the state of a given from address
type AddressState[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
] struct {
chainID CHAIN_ID
fromAddress ADDR
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

sync.RWMutex
idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
// NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB
unconfirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
}

// NewAddressState returns a new AddressState instance
func NewAddressState[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](
chainID CHAIN_ID,
fromAddress ADDR,
maxUnstarted int,
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) {
as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
chainID: chainID,
fromAddress: fromAddress,
txStore: txStore,

idempotencyKeyToTx: map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted),
inprogress: nil,
unconfirmed: map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
}

as.Lock()
defer as.Unlock()

// Load all unstarted transactions from persistent storage
offset := 0
limit := 50
for {
txs, count, err := txStore.UnstartedTransactions(offset, limit, as.fromAddress, as.chainID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there guarantee that two different instances of core won't run simultaneously on top of single DB? Example:(during maintenance or update of the node)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no i dont think there is. what scenarios are you thinking of?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory we might have a node that persists new tx after our node fetched them

if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
as.unstarted.AddTx(&tx)
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
}
offset += limit
}

// Load all in progress transactions from persistent storage
ctx := context.Background()
tx, err := txStore.GetTxInProgress(ctx, as.fromAddress)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
as.inprogress = tx
if tx != nil && tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}

// Load all unconfirmed transactions from persistent storage
offset = 0
limit = 50
Copy link
Contributor

@DylanTinianov DylanTinianov Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Could make the load limit a const. Would be easier to change in a single place if needed.

for {
txs, count, err := txStore.UnconfirmedTransactions(offset, limit, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
as.unconfirmed[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
}
offset += limit
}

return &as, nil

}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close() {
as.Lock()
defer as.Unlock()

as.unstarted.Close()
as.unstarted = nil
as.inprogress = nil
clear(as.unconfirmed)
clear(as.idempotencyKeyToTx)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unstartedCount() int {
as.RLock()
defer as.RUnlock()

return as.unstarted.Len()
}
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unconfirmedCount() int {
as.RLock()
defer as.RUnlock()

return len(as.unconfirmed)
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.RLock()
defer as.RUnlock()

return as.idempotencyKeyToTx[key]
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findLatestSequence() SEQ {
as.RLock()
defer as.RUnlock()

var maxSeq SEQ
if as.inprogress != nil && as.inprogress.Sequence != nil {
if (*as.inprogress.Sequence).Int64() > maxSeq.Int64() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably out of score for this PR, but should we use comparable restriction for SEQ instead of relaying on evm specific property Int64.

Copy link
Contributor Author

@poopoothegorilla poopoothegorilla Nov 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say yes lets add that in

maxSeq = *as.inprogress.Sequence
}
}
for _, tx := range as.unconfirmed {
if tx.Sequence == nil {
continue
}
if (*tx.Sequence).Int64() > maxSeq.Int64() {
maxSeq = *tx.Sequence
}
}

return maxSeq
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
as.RLock()
defer as.RUnlock()

tx := as.unstarted.PeekNextTx()
if tx == nil {
return nil, fmt.Errorf("peek_next_unstarted_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress)
}

return tx, nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
as.RLock()
defer as.RUnlock()

tx := as.inprogress
if tx == nil {
return nil, fmt.Errorf("peek_in_progress_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress)
}

return tx, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sharing pointer to tx feels extremely unsafe, should we implement marshal/unmarshal when receiving/returning instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this 💯 . We should think of ways to do this safely

}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.Lock()
defer as.Unlock()

if as.unstarted.Len() >= as.unstarted.Cap() {
return fmt.Errorf("move_tx_to_unstarted: address %s unstarted queue capactiry has been reached", as.fromAddress)
}

as.unstarted.AddTx(tx)
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.Lock()
defer as.Unlock()

if as.inprogress != nil {
return fmt.Errorf("move_unstarted_to_in_progress: address %s already has a transaction in progress", as.fromAddress)
}

if tx != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in which cases caller knowns better which tx to move?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you reword this statement? I am not sure i understand

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moveUnstartedToInProgress accepts tx as argument, but allows it to be nil. In that case we get tx from the priority queue. In which cases caller of the moveUnstartedToInProgress might want to use their own value of tx instead of one from the queue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my reasoning for doing this was to minimize potential conflicts with the persistent store and the inmemory state. so in this case the caller gets the next unstarted tx from the db. I think we should move from this in the future though... it might make sense to exclude it until we are ready to use it though

// if tx is not nil then remove the tx from the unstarted queue
// TODO(jtw): what should be the unique idenitifier for each transaction? ID is being set by the postgres DB
tx = as.unstarted.RemoveTxByID(tx.ID)
} else {
// if tx is nil then pop the next unstarted transaction
tx = as.unstarted.RemoveNextTx()
}
if tx == nil {
return fmt.Errorf("move_unstarted_to_in_progress: no unstarted transaction to move to in_progress")
}
tx.State = TxInProgress
as.inprogress = tx

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed(
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
) error {
as.Lock()
defer as.Unlock()

tx := as.inprogress
if tx == nil {
return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress")
}
tx.State = TxUnconfirmed

var found bool
for i := 0; i < len(tx.TxAttempts); i++ {
if tx.TxAttempts[i].ID == txAttempt.ID {
tx.TxAttempts[i] = txAttempt
found = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! thanks for spotting

break
}
}
if !found {
// NOTE(jtw): this would mean that the TxAttempt did not exist for the Tx
// NOTE(jtw): should this log a warning?
// NOTE(jtw): can this happen?
tx.TxAttempts = append(tx.TxAttempts, txAttempt)
}

as.unconfirmed[tx.ID] = tx
as.inprogress = nil

return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon() {
as.Lock()
defer as.Unlock()

for as.unstarted.Len() > 0 {
tx := as.unstarted.RemoveNextTx()
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx)
}

if as.inprogress != nil {
tx := as.inprogress
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx)
as.inprogress = nil
}
for _, tx := range as.unconfirmed {
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx)
}
for _, tx := range as.idempotencyKeyToTx {
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx)
}

clear(as.unconfirmed)
}

func abandon[
CHAIN_ID types.ID,
ADDR, TX_HASH, BLOCK_HASH types.Hashable,
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH],
SEQ types.Sequence,
FEE feetypes.Fee,
](tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
if tx == nil {
return
}

tx.State = TxFatalError
tx.Sequence = nil
tx.Error = null.NewString("abandoned", true)
}
2 changes: 1 addition & 1 deletion common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,7 +698,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
defer cancel()
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, ErrTxnNotFound) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels weird to force users of txStore to check for two different error, that actually mean the same. Won't it be easier to return sql.ErrNoRows from in memory storage

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to get rid of the reliance on sql.ErrNoRows so this is the first step. relying on it forces us to be dependent on the sql pkg... i think we should use our own error types for it so we can make this less dependent

// Finish. No more transactions left to process. Hoorah!
return nil, nil
}
Expand Down
Loading
Loading