-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial work to add an in-memory layer on top of the TxStore #11190
Changes from all commits
c53bb4c
b75d260
41a572d
3dde323
201a2dd
0c1d6cf
b565d45
faf670f
178b0f0
fe25a26
a0ac2a3
a28aa80
e2f3a87
91bea75
5314284
5a908be
9872283
df5e946
fcb9021
5d78868
a0a1a79
89d1056
3663bd0
17cd10d
54b7ecc
f8e0175
df1a11a
5452d02
5f0f6da
f24eea7
325fb12
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,307 @@ | ||
package txmgr | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" | ||
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" | ||
"github.com/smartcontractkit/chainlink/v2/common/types" | ||
"gopkg.in/guregu/null.v4" | ||
) | ||
|
||
// AddressState is the state of a given from address | ||
type AddressState[ | ||
CHAIN_ID types.ID, | ||
ADDR, TX_HASH, BLOCK_HASH types.Hashable, | ||
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], | ||
SEQ types.Sequence, | ||
FEE feetypes.Fee, | ||
] struct { | ||
chainID CHAIN_ID | ||
fromAddress ADDR | ||
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] | ||
|
||
sync.RWMutex | ||
idempotencyKeyToTx map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] | ||
unstarted *TxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] | ||
inprogress *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] | ||
// NOTE: currently the unconfirmed map's key is the transaction ID that is assigned via the postgres DB | ||
unconfirmed map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] | ||
} | ||
|
||
// NewAddressState returns a new AddressState instance | ||
func NewAddressState[ | ||
CHAIN_ID types.ID, | ||
ADDR, TX_HASH, BLOCK_HASH types.Hashable, | ||
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], | ||
SEQ types.Sequence, | ||
FEE feetypes.Fee, | ||
]( | ||
chainID CHAIN_ID, | ||
fromAddress ADDR, | ||
maxUnstarted int, | ||
txStore PersistentTxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE], | ||
) (*AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE], error) { | ||
as := AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{ | ||
chainID: chainID, | ||
fromAddress: fromAddress, | ||
txStore: txStore, | ||
|
||
idempotencyKeyToTx: map[string]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, | ||
unstarted: NewTxPriorityQueue[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](maxUnstarted), | ||
inprogress: nil, | ||
unconfirmed: map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{}, | ||
} | ||
|
||
as.Lock() | ||
defer as.Unlock() | ||
|
||
// Load all unstarted transactions from persistent storage | ||
offset := 0 | ||
limit := 50 | ||
for { | ||
txs, count, err := txStore.UnstartedTransactions(offset, limit, as.fromAddress, as.chainID) | ||
if err != nil { | ||
return nil, fmt.Errorf("address_state: initialization: %w", err) | ||
} | ||
for i := 0; i < len(txs); i++ { | ||
tx := txs[i] | ||
as.unstarted.AddTx(&tx) | ||
if tx.IdempotencyKey != nil { | ||
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx | ||
} | ||
} | ||
if count <= offset+limit { | ||
break | ||
} | ||
offset += limit | ||
} | ||
|
||
// Load all in progress transactions from persistent storage | ||
ctx := context.Background() | ||
tx, err := txStore.GetTxInProgress(ctx, as.fromAddress) | ||
if err != nil { | ||
return nil, fmt.Errorf("address_state: initialization: %w", err) | ||
} | ||
as.inprogress = tx | ||
if tx != nil && tx.IdempotencyKey != nil { | ||
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx | ||
} | ||
|
||
// Load all unconfirmed transactions from persistent storage | ||
offset = 0 | ||
limit = 50 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: Could make the load limit a const. Would be easier to change in a single place if needed. |
||
for { | ||
txs, count, err := txStore.UnconfirmedTransactions(offset, limit, as.fromAddress, as.chainID) | ||
if err != nil { | ||
return nil, fmt.Errorf("address_state: initialization: %w", err) | ||
} | ||
for i := 0; i < len(txs); i++ { | ||
tx := txs[i] | ||
as.unconfirmed[tx.ID] = &tx | ||
if tx.IdempotencyKey != nil { | ||
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx | ||
} | ||
} | ||
if count <= offset+limit { | ||
break | ||
} | ||
offset += limit | ||
} | ||
|
||
return &as, nil | ||
|
||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) close() { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
as.unstarted.Close() | ||
as.unstarted = nil | ||
as.inprogress = nil | ||
clear(as.unconfirmed) | ||
clear(as.idempotencyKeyToTx) | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unstartedCount() int { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
return as.unstarted.Len() | ||
} | ||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) unconfirmedCount() int { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
return len(as.unconfirmed) | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findTxWithIdempotencyKey(key string) *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
return as.idempotencyKeyToTx[key] | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) findLatestSequence() SEQ { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
var maxSeq SEQ | ||
if as.inprogress != nil && as.inprogress.Sequence != nil { | ||
if (*as.inprogress.Sequence).Int64() > maxSeq.Int64() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably out of score for this PR, but should we use comparable restriction for SEQ instead of relaying on evm specific property There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would say yes lets add that in |
||
maxSeq = *as.inprogress.Sequence | ||
} | ||
} | ||
for _, tx := range as.unconfirmed { | ||
if tx.Sequence == nil { | ||
continue | ||
} | ||
if (*tx.Sequence).Int64() > maxSeq.Int64() { | ||
maxSeq = *tx.Sequence | ||
} | ||
} | ||
|
||
return maxSeq | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
tx := as.unstarted.PeekNextTx() | ||
if tx == nil { | ||
return nil, fmt.Errorf("peek_next_unstarted_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress) | ||
} | ||
|
||
return tx, nil | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) peekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { | ||
as.RLock() | ||
defer as.RUnlock() | ||
|
||
tx := as.inprogress | ||
if tx == nil { | ||
return nil, fmt.Errorf("peek_in_progress_tx: %w (address: %s)", ErrTxnNotFound, as.fromAddress) | ||
} | ||
|
||
return tx, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sharing pointer to tx feels extremely unsafe, should we implement marshal/unmarshal when receiving/returning instance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this 💯 . We should think of ways to do this safely |
||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) addTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
if as.unstarted.Len() >= as.unstarted.Cap() { | ||
return fmt.Errorf("move_tx_to_unstarted: address %s unstarted queue capactiry has been reached", as.fromAddress) | ||
} | ||
|
||
as.unstarted.AddTx(tx) | ||
if tx.IdempotencyKey != nil { | ||
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveUnstartedToInProgress(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
if as.inprogress != nil { | ||
return fmt.Errorf("move_unstarted_to_in_progress: address %s already has a transaction in progress", as.fromAddress) | ||
} | ||
|
||
if tx != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in which cases caller knowns better which tx to move? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could you reword this statement? I am not sure i understand There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. moveUnstartedToInProgress accepts tx as argument, but allows it to be nil. In that case we get tx from the priority queue. In which cases caller of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. my reasoning for doing this was to minimize potential conflicts with the persistent store and the inmemory state. so in this case the caller gets the next unstarted tx from the db. I think we should move from this in the future though... it might make sense to exclude it until we are ready to use it though |
||
// if tx is not nil then remove the tx from the unstarted queue | ||
// TODO(jtw): what should be the unique idenitifier for each transaction? ID is being set by the postgres DB | ||
tx = as.unstarted.RemoveTxByID(tx.ID) | ||
} else { | ||
// if tx is nil then pop the next unstarted transaction | ||
tx = as.unstarted.RemoveNextTx() | ||
} | ||
if tx == nil { | ||
return fmt.Errorf("move_unstarted_to_in_progress: no unstarted transaction to move to in_progress") | ||
} | ||
tx.State = TxInProgress | ||
as.inprogress = tx | ||
|
||
return nil | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) moveInProgressToUnconfirmed( | ||
txAttempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], | ||
) error { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
tx := as.inprogress | ||
if tx == nil { | ||
return fmt.Errorf("move_in_progress_to_unconfirmed: no transaction in progress") | ||
} | ||
tx.State = TxUnconfirmed | ||
|
||
var found bool | ||
for i := 0; i < len(tx.TxAttempts); i++ { | ||
if tx.TxAttempts[i].ID == txAttempt.ID { | ||
tx.TxAttempts[i] = txAttempt | ||
found = true | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. break? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good catch! thanks for spotting |
||
break | ||
} | ||
} | ||
if !found { | ||
// NOTE(jtw): this would mean that the TxAttempt did not exist for the Tx | ||
// NOTE(jtw): should this log a warning? | ||
// NOTE(jtw): can this happen? | ||
tx.TxAttempts = append(tx.TxAttempts, txAttempt) | ||
} | ||
|
||
as.unconfirmed[tx.ID] = tx | ||
as.inprogress = nil | ||
|
||
return nil | ||
} | ||
|
||
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) abandon() { | ||
as.Lock() | ||
defer as.Unlock() | ||
|
||
for as.unstarted.Len() > 0 { | ||
tx := as.unstarted.RemoveNextTx() | ||
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx) | ||
} | ||
|
||
if as.inprogress != nil { | ||
tx := as.inprogress | ||
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx) | ||
as.inprogress = nil | ||
} | ||
for _, tx := range as.unconfirmed { | ||
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx) | ||
} | ||
for _, tx := range as.idempotencyKeyToTx { | ||
abandon[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE](tx) | ||
} | ||
|
||
clear(as.unconfirmed) | ||
} | ||
|
||
func abandon[ | ||
CHAIN_ID types.ID, | ||
ADDR, TX_HASH, BLOCK_HASH types.Hashable, | ||
R txmgrtypes.ChainReceipt[TX_HASH, BLOCK_HASH], | ||
SEQ types.Sequence, | ||
FEE feetypes.Fee, | ||
](tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { | ||
if tx == nil { | ||
return | ||
} | ||
|
||
tx.State = TxFatalError | ||
tx.Sequence = nil | ||
tx.Error = null.NewString("abandoned", true) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -698,7 +698,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next | |
defer cancel() | ||
etx := &txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{} | ||
if err := eb.txStore.FindNextUnstartedTransactionFromAddress(ctx, etx, fromAddress, eb.chainID); err != nil { | ||
if errors.Is(err, sql.ErrNoRows) { | ||
if errors.Is(err, sql.ErrNoRows) || errors.Is(err, ErrTxnNotFound) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It feels weird to force users of txStore to check for two different error, that actually mean the same. Won't it be easier to return sql.ErrNoRows from in memory storage There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to get rid of the reliance on sql.ErrNoRows so this is the first step. relying on it forces us to be dependent on the sql pkg... i think we should use our own error types for it so we can make this less dependent |
||
// Finish. No more transactions left to process. Hoorah! | ||
return nil, nil | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there guarantee that two different instances of core won't run simultaneously on top of single DB? Example:(during maintenance or update of the node)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no i dont think there is. what scenarios are you thinking of?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory we might have a node that persists new tx after our node fetched them