Skip to content

Commit

Permalink
Store transaction errors caught before broadcast (#936)
Browse files Browse the repository at this point in the history
* Enabled TXM to store error statuses for transactions caught before broadcast

* Addressed feedback

* Removed id from finished tx metadata to reduce memory footprint

* Updated logs
  • Loading branch information
amit-momin authored Nov 26, 2024
1 parent dcd180e commit e68ee4c
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 212 deletions.
188 changes: 126 additions & 62 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,28 @@ type PendingTxContext interface {
OnConfirmed(sig solana.Signature) (string, error)
// OnFinalized marks transaction as Finalized, moves it from the broadcasted or confirmed map to finalized map, removes signatures from signature map to stop confirmation checks
OnFinalized(sig solana.Signature, retentionTimeout time.Duration) (string, error)
// OnPrebroadcastError adds transaction that has not yet been broadcasted to the finalized/errored map as errored, matches err type using enum
OnPrebroadcastError(id string, retentionTimeout time.Duration, txState TxState, errType TxErrType) error
// OnError marks transaction as errored, matches err type using enum, moves it from the broadcasted or confirmed map to finalized/errored map, removes signatures from signature map to stop confirmation checks
OnError(sig solana.Signature, retentionTimeout time.Duration, errType int) (string, error)
OnError(sig solana.Signature, retentionTimeout time.Duration, txState TxState, errType TxErrType) (string, error)
// GetTxState returns the transaction state for the provided ID if it exists
GetTxState(id string) (TxState, error)
// TrimFinalizedErroredTxs removes transactions that have reached their retention time
TrimFinalizedErroredTxs()
TrimFinalizedErroredTxs() int
}

// finishedTx is used to store info required to track transactions to finality or error
type pendingTx struct {
tx solana.Transaction
cfg TxConfig
signatures []solana.Signature
id string
createTs time.Time
tx solana.Transaction
cfg TxConfig
signatures []solana.Signature
id string
createTs time.Time
state TxState
}

// finishedTx is used to store minimal info specifically for finalized or errored transactions for external status checks
type finishedTx struct {
retentionTs time.Time
state TxState
}
Expand All @@ -60,9 +68,9 @@ type pendingTxContext struct {
cancelBy map[string]context.CancelFunc
sigToID map[solana.Signature]string

broadcastedTxs map[string]pendingTx // transactions that require retry and bumping i.e broadcasted, processed
confirmedTxs map[string]pendingTx // transactions that require monitoring for re-org
finalizedErroredTxs map[string]pendingTx // finalized and errored transactions held onto for status
broadcastedTxs map[string]pendingTx // transactions that require retry and bumping i.e broadcasted, processed
confirmedTxs map[string]pendingTx // transactions that require monitoring for re-org
finalizedErroredTxs map[string]finishedTx // finalized and errored transactions held onto for status

lock sync.RWMutex
}
Expand All @@ -74,7 +82,7 @@ func newPendingTxContext() *pendingTxContext {

broadcastedTxs: map[string]pendingTx{},
confirmedTxs: map[string]pendingTx{},
finalizedErroredTxs: map[string]pendingTx{},
finalizedErroredTxs: map[string]finishedTx{},
}
}

Expand Down Expand Up @@ -262,7 +270,6 @@ func (c *pendingTxContext) OnProcessed(sig solana.Signature) (string, error) {
if !exists {
return id, ErrTransactionNotFound
}
tx = c.broadcastedTxs[id]
// update tx state to Processed
tx.state = Processed
// save updated tx back to the broadcasted map
Expand Down Expand Up @@ -298,15 +305,15 @@ func (c *pendingTxContext) OnConfirmed(sig solana.Signature) (string, error) {
if !sigExists {
return id, ErrSigDoesNotExist
}
if _, exists := c.broadcastedTxs[id]; !exists {
tx, exists := c.broadcastedTxs[id]
if !exists {
return id, ErrTransactionNotFound
}
// call cancel func + remove from map to stop the retry/bumping cycle for this transaction
if cancel, exists := c.cancelBy[id]; exists {
cancel() // cancel context
delete(c.cancelBy, id)
}
tx := c.broadcastedTxs[id]
// update tx state to Confirmed
tx.state = Confirmed
// move tx to confirmed map
Expand Down Expand Up @@ -371,17 +378,58 @@ func (c *pendingTxContext) OnFinalized(sig solana.Signature, retentionTimeout ti
if retentionTimeout == 0 {
return id, nil
}
// set the timestamp till which the tx should be retained in storage
tx.retentionTs = time.Now().Add(retentionTimeout)
// update tx state to Finalized
tx.state = Finalized
finalizedTx := finishedTx{
state: Finalized,
retentionTs: time.Now().Add(retentionTimeout),
}
// move transaction from confirmed to finalized map
c.finalizedErroredTxs[id] = tx
c.finalizedErroredTxs[id] = finalizedTx
return id, nil
})
}

func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.Duration, _ int) (string, error) {
func (c *pendingTxContext) OnPrebroadcastError(id string, retentionTimeout time.Duration, txState TxState, _ TxErrType) error {
// nothing to do if retention timeout is 0 since transaction is not stored yet.
if retentionTimeout == 0 {
return nil
}
err := c.withReadLock(func() error {
if tx, exists := c.finalizedErroredTxs[id]; exists && tx.state == txState {
return ErrAlreadyInExpectedState
}
_, broadcastedExists := c.broadcastedTxs[id]
_, confirmedExists := c.confirmedTxs[id]
if broadcastedExists || confirmedExists {
return ErrIDAlreadyExists
}
return nil
})
if err != nil {
return err
}

// upgrade to write lock if id does not exist in other maps and is not in expected state already
_, err = c.withWriteLock(func() (string, error) {
if tx, exists := c.finalizedErroredTxs[id]; exists && tx.state == txState {
return "", ErrAlreadyInExpectedState
}
_, broadcastedExists := c.broadcastedTxs[id]
_, confirmedExists := c.confirmedTxs[id]
if broadcastedExists || confirmedExists {
return "", ErrIDAlreadyExists
}
erroredTx := finishedTx{
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
}
// add transaction to error map
c.finalizedErroredTxs[id] = erroredTx
return id, nil
})
return err
}

func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.Duration, txState TxState, _ TxErrType) (string, error) {
err := c.withReadLock(func() error {
id, sigExists := c.sigToID[sig]
if !sigExists {
Expand Down Expand Up @@ -432,17 +480,16 @@ func (c *pendingTxContext) OnError(sig solana.Signature, retentionTimeout time.D
for _, s := range tx.signatures {
delete(c.sigToID, s)
}
// if retention duration is set to 0, delete transaction from storage
// otherwise, move to finalized map
// if retention duration is set to 0, skip adding transaction to the errored map
if retentionTimeout == 0 {
return id, nil
}
// set the timestamp till which the tx should be retained in storage
tx.retentionTs = time.Now().Add(retentionTimeout)
// update tx state to Errored
tx.state = Errored
erroredTx := finishedTx{
state: txState,
retentionTs: time.Now().Add(retentionTimeout),
}
// move transaction from broadcasted to error map
c.finalizedErroredTxs[id] = tx
c.finalizedErroredTxs[id] = erroredTx
return id, nil
})
}
Expand All @@ -463,18 +510,31 @@ func (c *pendingTxContext) GetTxState(id string) (TxState, error) {
}

// TrimFinalizedErroredTxs deletes transactions from the finalized/errored map and the allTxs map after the retention period has passed
func (c *pendingTxContext) TrimFinalizedErroredTxs() {
c.lock.Lock()
defer c.lock.Unlock()
expiredIDs := make([]string, 0, len(c.finalizedErroredTxs))
for id, tx := range c.finalizedErroredTxs {
if time.Now().After(tx.retentionTs) {
expiredIDs = append(expiredIDs, id)
func (c *pendingTxContext) TrimFinalizedErroredTxs() int {
var expiredIDs []string
err := c.withReadLock(func() error {
expiredIDs = make([]string, 0, len(c.finalizedErroredTxs))
for id, tx := range c.finalizedErroredTxs {
if time.Now().After(tx.retentionTs) {
expiredIDs = append(expiredIDs, id)
}
}
return nil
})
if err != nil {
return 0
}
for _, id := range expiredIDs {
delete(c.finalizedErroredTxs, id)

_, err = c.withWriteLock(func() (string, error) {
for _, id := range expiredIDs {
delete(c.finalizedErroredTxs, id)
}
return "", nil
})
if err != nil {
return 0
}
return len(expiredIDs)
}

func (c *pendingTxContext) withReadLock(fn func() error) error {
Expand All @@ -496,8 +556,11 @@ type pendingTxContextWithProm struct {
chainID string
}

type TxErrType int

const (
TxFailRevert = iota
NoFailure TxErrType = iota
TxFailRevert
TxFailReject
TxFailDrop
TxFailSimRevert
Expand Down Expand Up @@ -554,44 +617,45 @@ func (c *pendingTxContextWithProm) OnFinalized(sig solana.Signature, retentionTi
return id, err
}

func (c *pendingTxContextWithProm) OnError(sig solana.Signature, retentionTimeout time.Duration, errType int) (string, error) {
// special RPC rejects transaction (signature will not be valid)
if errType == TxFailReject {
promSolTxmRejectTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1)
return "", nil
func (c *pendingTxContextWithProm) OnError(sig solana.Signature, retentionTimeout time.Duration, txState TxState, errType TxErrType) (string, error) {
id, err := c.pendingTx.OnError(sig, retentionTimeout, txState, errType) // err indicates transaction not found so may already be removed
if err == nil {
incrementErrorMetrics(errType, c.chainID)
}
return id, err
}

id, err := c.pendingTx.OnError(sig, retentionTimeout, errType) // err indicates transaction not found so may already be removed
func (c *pendingTxContextWithProm) OnPrebroadcastError(id string, retentionTimeout time.Duration, txState TxState, errType TxErrType) error {
err := c.pendingTx.OnPrebroadcastError(id, retentionTimeout, txState, errType) // err indicates transaction not found so may already be removed
if err == nil {
switch errType {
case TxFailRevert:
promSolTxmRevertTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1)
case TxFailDrop:
promSolTxmDropTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1)
}
incrementErrorMetrics(errType, c.chainID)
}
return err
}

// Increment simulation error metrics even if no tx found for sig
// Simulation could have occurred before initial broadcast so tx was never stored
func incrementErrorMetrics(errType TxErrType, chainID string) {
switch errType {
case NoFailure:
// Return early if no failure identified
return
case TxFailReject:
promSolTxmRejectTxs.WithLabelValues(chainID).Inc()
case TxFailRevert:
promSolTxmRevertTxs.WithLabelValues(chainID).Inc()
case TxFailDrop:
promSolTxmDropTxs.WithLabelValues(chainID).Inc()
case TxFailSimRevert:
promSolTxmSimRevertTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmSimRevertTxs.WithLabelValues(chainID).Inc()
case TxFailSimOther:
promSolTxmSimOtherTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmErrorTxs.WithLabelValues(c.chainID).Add(1)
promSolTxmSimOtherTxs.WithLabelValues(chainID).Inc()
}

return id, err
promSolTxmErrorTxs.WithLabelValues(chainID).Inc()
}

func (c *pendingTxContextWithProm) GetTxState(id string) (TxState, error) {
return c.pendingTx.GetTxState(id)
}

func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() {
c.pendingTx.TrimFinalizedErroredTxs()
func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() int {
return c.pendingTx.TrimFinalizedErroredTxs()
}
Loading

0 comments on commit e68ee4c

Please sign in to comment.