Skip to content

Commit

Permalink
feat(baseapp): integrate the appdata.Listener in baseapp (#21965)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Oct 3, 2024
1 parent c8f4cf7 commit 80726f7
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 8 deletions.
9 changes: 8 additions & 1 deletion baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -831,7 +832,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
// vote extensions, so skip those.
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
for _, rawTx := range req.Txs {
for txIndex, rawTx := range req.Txs {

response := app.deliverTx(rawTx)

Expand All @@ -843,6 +844,12 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz
// continue
}

// append the tx index to the response.Events
for i, event := range response.Events {
response.Events[i].Attributes = append(event.Attributes,
abci.EventAttribute{Key: "tx_index", Value: strconv.Itoa(txIndex)})
}

txResults = append(txResults, response)
}

Expand Down
2 changes: 1 addition & 1 deletion baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) {

events := res.TxResults[i].GetEvents()
require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[0].Attributes[0], "ante handler event")
require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[2].Attributes[0], "msg handler update counter event")
}

Expand Down
17 changes: 17 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,15 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.SetContext(ctx)
events = ctx.EventManager().ABCIEvents()

// append PreBlock attributes to all events
for i, event := range events {
events[i].Attributes = append(
event.Attributes,
abci.EventAttribute{Key: "mode", Value: "PreBlock"},
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
)
}
}
return events, nil
}
Expand All @@ -739,6 +748,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er
resp.Events[i].Attributes = append(
event.Attributes,
abci.EventAttribute{Key: "mode", Value: "BeginBlock"},
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
)
}

Expand Down Expand Up @@ -801,6 +811,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
eb.Events[i].Attributes = append(
event.Attributes,
abci.EventAttribute{Key: "mode", Value: "EndBlock"},
abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)},
)
}

Expand Down Expand Up @@ -1151,6 +1162,12 @@ func createEvents(cdc codec.Codec, events sdk.Events, msg sdk.Msg, reflectMsg pr
}
}

// append the event_index attribute to all events
msgEvent = msgEvent.AppendAttributes(sdk.NewAttribute("event_index", "0"))
for i, event := range events {
events[i] = event.AppendAttributes(sdk.NewAttribute("event_index", strconv.Itoa(i+1)))
}

return sdk.Events{msgEvent}.AppendEvents(events), nil
}

Expand Down
103 changes: 97 additions & 6 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sort"
"strconv"
"strings"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
Expand Down Expand Up @@ -143,21 +144,111 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
return exposeStoreKeys
}

func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
appdataEvent := appdata.Event{
Type: event.Type,
Attributes: func() ([]appdata.EventAttribute, error) {
attrs := make([]appdata.EventAttribute, len(event.Attributes))
for j, attr := range event.Attributes {
attrs[j] = appdata.EventAttribute{
Key: attr.Key,
Value: attr.Value,
}
}
return attrs, nil
},
}

for _, attr := range event.Attributes {
if attr.Key == "mode" {
switch attr.Value {
case "PreBlock":
appdataEvent.BlockStage = appdata.PreBlockStage
case "BeginBlock":
appdataEvent.BlockStage = appdata.BeginBlockStage
case "EndBlock":
appdataEvent.BlockStage = appdata.EndBlockStage
default:
appdataEvent.BlockStage = appdata.UnknownBlockStage
}
} else if attr.Key == "tx_index" {
txIndex, err := strconv.Atoi(attr.Value)
if err != nil {
return appdata.Event{}, err
}
appdataEvent.TxIndex = int32(txIndex + 1)
appdataEvent.BlockStage = appdata.TxProcessingStage
} else if attr.Key == "msg_index" {
msgIndex, err := strconv.Atoi(attr.Value)
if err != nil {
return appdata.Event{}, err
}
appdataEvent.MsgIndex = int32(msgIndex + 1)
} else if attr.Key == "event_index" {
eventIndex, err := strconv.Atoi(attr.Value)
if err != nil {
return appdata.Event{}, err
}
appdataEvent.EventIndex = int32(eventIndex + 1)
}
}

return appdataEvent, nil
}

type listenerWrapper struct {
listener appdata.Listener
}

// NewListenerWrapper creates a new listenerWrapper.
// It is only used for testing purposes.
func NewListenerWrapper(listener appdata.Listener) listenerWrapper {
return listenerWrapper{listener: listener}
}

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
})
if err != nil {
if err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
if p.listener.OnTx != nil {
for i, tx := range req.Txs {
if err := p.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
}
}
if p.listener.OnEvent != nil {
events := make([]appdata.Event, len(res.Events))
var err error
for i, event := range res.Events {
events[i], err = eventToAppDataEvent(event)
if err != nil {
return err
}
}
for _, txResult := range res.TxResults {
for _, event := range txResult.Events {
appdataEvent, err := eventToAppDataEvent(event)
if err != nil {
return err
}
events = append(events, appdataEvent)
}
}
if err := p.listener.OnEvent(appdata.EventData{Events: events}); err != nil {
return err
}
}

//// TODO txs, events

return nil
}
Expand Down
Loading

0 comments on commit 80726f7

Please sign in to comment.