Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: pre-estimation don't run in parallel #523

Merged
merged 5 commits into from
Sep 12, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 68 additions & 37 deletions app/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"context"
"io"
"sync"
"sync/atomic"

"cosmossdk.io/collections"
Expand All @@ -21,6 +22,8 @@
blockstm "github.com/crypto-org-chain/go-block-stm"
)

const MinimalParallelPreEstimate = 32

func DefaultTxExecutor(_ context.Context,
txs [][]byte,
ms storetypes.MultiStore,
Expand Down Expand Up @@ -73,17 +76,14 @@
incarnationCache[i].Store(&m)
}

var estimates map[int]blockstm.MultiLocations
memTxs := make([]sdk.Tx, len(txs))
var (
estimates map[int]blockstm.MultiLocations
memTxs []sdk.Tx
)

Check warning on line 82 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L79-L82

Added lines #L79 - L82 were not covered by tests
if estimate {
for i, rawTx := range txs {
if memTx, err := txDecoder(rawTx); err == nil {
memTxs[i] = memTx
}
}
// pre-estimation
evmDenom := evmKeeper.GetParams(sdk.NewContext(ms, cmtproto.Header{}, false, log.NewNopLogger())).EvmDenom
estimates = preEstimates(memTxs, authStore, bankStore, evmDenom)
memTxs, estimates = preEstimates(txs, workers, authStore, bankStore, evmDenom, txDecoder)

Check warning on line 86 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L86

Added line #L86 was not covered by tests
}

if err := blockstm.ExecuteBlockWithEstimates(
Expand Down Expand Up @@ -188,40 +188,71 @@

// preEstimates returns a static estimation of the written keys for each transaction.
// NOTE: make sure it sync with the latest sdk logic when sdk upgrade.
func preEstimates(txs []sdk.Tx, authStore, bankStore int, evmDenom string) map[int]blockstm.MultiLocations {
func preEstimates(txs [][]byte, workers, authStore, bankStore int, evmDenom string, txDecoder sdk.TxDecoder) ([]sdk.Tx, map[int]blockstm.MultiLocations) {

Check failure on line 191 in app/executor.go

View workflow job for this annotation

GitHub Actions / Run golangci-lint

the line is 154 characters long, which exceeds the maximum of 150 characters. (lll)
memTxs := make([]sdk.Tx, len(txs))

Check warning on line 192 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L191-L192

Added lines #L191 - L192 were not covered by tests
estimates := make(map[int]blockstm.MultiLocations, len(txs))
for i, tx := range txs {
feeTx, ok := tx.(sdk.FeeTx)
if !ok {
continue
}
feePayer := sdk.AccAddress(feeTx.FeePayer())

// account key
accKey, err := collections.EncodeKeyWithPrefix(
authtypes.AddressStoreKeyPrefix,
sdk.AccAddressKey,
feePayer,
)
if err != nil {
continue
}
job := func(start, end int) {
for i := start; i < end; i++ {
rawTx := txs[i]
tx, err := txDecoder(rawTx)
if err != nil {
continue

Check warning on line 200 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L195-L200

Added lines #L195 - L200 were not covered by tests
}
memTxs[i] = tx

Check warning on line 202 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L202

Added line #L202 was not covered by tests

// balance key
balanceKey, err := collections.EncodeKeyWithPrefix(
banktypes.BalancesPrefix,
collections.PairKeyCodec(sdk.AccAddressKey, collections.StringKey),
collections.Join(feePayer, evmDenom),
)
if err != nil {
continue
}
feeTx, ok := tx.(sdk.FeeTx)
if !ok {
continue

Check warning on line 206 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L204-L206

Added lines #L204 - L206 were not covered by tests
}
feePayer := sdk.AccAddress(feeTx.FeePayer())

Check warning on line 208 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L208

Added line #L208 was not covered by tests

// account key
accKey, err := collections.EncodeKeyWithPrefix(
authtypes.AddressStoreKeyPrefix,
sdk.AccAddressKey,
feePayer,
)
if err != nil {
mmsqe marked this conversation as resolved.
Show resolved Hide resolved
continue

Check warning on line 217 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L211-L217

Added lines #L211 - L217 were not covered by tests
}

estimates[i] = blockstm.MultiLocations{
authStore: {accKey},
bankStore: {balanceKey},
// balance key
balanceKey, err := collections.EncodeKeyWithPrefix(
banktypes.BalancesPrefix,
collections.PairKeyCodec(sdk.AccAddressKey, collections.StringKey),
collections.Join(feePayer, evmDenom),
)
if err != nil {
continue

Check warning on line 227 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L221-L227

Added lines #L221 - L227 were not covered by tests
}

estimates[i] = blockstm.MultiLocations{
authStore: {accKey},
bankStore: {balanceKey},

Check warning on line 232 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L230-L232

Added lines #L230 - L232 were not covered by tests
}
}
}

return estimates
blockSize := len(txs)
chunk := blockSize / workers
if blockSize < MinimalParallelPreEstimate || chunk == 0 {
job(0, blockSize)
} else {
var wg sync.WaitGroup
wg.Add(workers)
for i := 0; i < workers; i++ {
start := i * chunk
end := (i + 1) * chunk
if i == workers-1 {
end = blockSize

Check warning on line 248 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L237-L248

Added lines #L237 - L248 were not covered by tests
}
go func() {
defer wg.Done()
job(start, end)
}()

Check warning on line 253 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L250-L253

Added lines #L250 - L253 were not covered by tests
Fixed Show fixed Hide fixed
}
wg.Wait()

Check warning on line 255 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L255

Added line #L255 was not covered by tests
}
return memTxs, estimates

Check warning on line 257 in app/executor.go

View check run for this annotation

Codecov / codecov/patch

app/executor.go#L257

Added line #L257 was not covered by tests
}
Loading