Skip to content

Commit

Permalink
Change...a lot of things to circumvent Amino
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Jan 4, 2024
1 parent 34b9f9d commit 538ba7b
Show file tree
Hide file tree
Showing 31 changed files with 328 additions and 112 deletions.
64 changes: 40 additions & 24 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"fmt"

"github.com/gnolang/tx-indexer/client"
"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/fetch"

Check failure on line 10 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 10 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 10 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 10 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch

Check failure on line 10 in cmd/start.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

could not import github.com/gnolang/tx-indexer/fetch (-: # github.com/gnolang/tx-indexer/fetch
"github.com/gnolang/tx-indexer/serve"
"github.com/gnolang/tx-indexer/serve/handlers/block"
"github.com/gnolang/tx-indexer/serve/handlers/tx"
"github.com/gnolang/tx-indexer/storage"
"github.com/peterbourgon/ff/v3/ffcli"
"go.uber.org/zap"
Expand Down Expand Up @@ -107,37 +106,25 @@ func (c *startCfg) exec(ctx context.Context) error {
}
}()

// Create the fetcher instance
// Create an Event Manager instance
em := events.NewManager()

// Create the fetcher service
f := fetch.New(
db,
client.NewClient(c.remote),
em,
fetch.WithLogger(
logger.Named("fetcher"),
),
)

// Create the JSON-RPC service
j := serve.NewJSONRPC(
serve.WithLogger(
logger.Named("json-rpc"),
),
serve.WithListenAddress(
c.listenAddress,
),
)

txHandler := tx.NewHandler(db)
blockHandler := block.NewHandler(db)

// Register handlers
j.RegisterHandler(
"getTx",
txHandler.GetTxHandler,
)

j.RegisterHandler(
"getBlock",
blockHandler.GetBlockHandler,
j := setupJSONRPC(
c.listenAddress,
db,
em,
logger,
)

// Create a new waiter
Expand All @@ -152,3 +139,32 @@ func (c *startCfg) exec(ctx context.Context) error {
// Wait for the services to stop
return w.wait()
}

// setupJSONRPC sets up the JSONRPC instance
func setupJSONRPC(
listenAddress string,
db *storage.Storage,
em *events.Manager,
logger *zap.Logger,
) *serve.JSONRPC {
j := serve.NewJSONRPC(
em,
serve.WithLogger(
logger.Named("json-rpc"),
),
serve.WithListenAddress(
listenAddress,
),
)

// Transaction handlers
j.RegisterTxEndpoints(db)

// Block handlers
j.RegisterBlockEndpoints(db)

// Sub handlers
j.RegisterSubEndpoints(db)

return j
}
15 changes: 11 additions & 4 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

storageErrors "github.com/gnolang/tx-indexer/storage/errors"
"github.com/gnolang/tx-indexer/types"
queue "github.com/madz-lab/insertion-queue"
"go.uber.org/zap"
)
Expand All @@ -21,6 +22,7 @@ const (
type Fetcher struct {
storage Storage
client Client
events Events

logger *zap.Logger
chunkBuffer *slots
Expand All @@ -32,11 +34,13 @@ type Fetcher struct {
func New(
storage Storage,
client Client,
events Events,
opts ...Option,
) *Fetcher {
f := &Fetcher{
storage: storage,
client: client,
events: events,
queryInterval: 1 * time.Second,
logger: zap.NewNop(),
chunkBuffer: &slots{Queue: make([]queue.Item, 0), maxSlots: maxSlots},
Expand All @@ -50,10 +54,6 @@ func New(
}

func (f *Fetcher) FetchTransactions(ctx context.Context) error {
defer func() {
f.logger.Info("Fetcher service shut down")
}()

collectorCh := make(chan *workerResponse, maxSlots)

startRangeFetch := func() error {
Expand Down Expand Up @@ -120,6 +120,9 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
for {
select {
case <-ctx.Done():
f.logger.Info("Fetcher service shut down")
close(collectorCh)

return nil
case <-ticker.C:
if err := startRangeFetch(); err != nil {
Expand Down Expand Up @@ -167,6 +170,10 @@ func (f *Fetcher) FetchTransactions(ctx context.Context) error {
}

f.logger.Debug("Saved block data", zap.Int64("number", block.Height))

f.events.SignalEvent(&types.NewBlock{
Block: block,
})
}

for _, txResult := range item.chunk.results {
Expand Down
7 changes: 7 additions & 0 deletions fetch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
"github.com/gnolang/gno/tm2/pkg/bft/types"
clientTypes "github.com/gnolang/tx-indexer/client/types"
"github.com/gnolang/tx-indexer/events"
)

// Storage defines the transaction storage interface
Expand Down Expand Up @@ -36,3 +37,9 @@ type Client interface {
// CreateBatch creates a new client batch
CreateBatch() clientTypes.Batch
}

// Events is the events API
type Events interface {
// SignalEvent signals a new event to the event manager
SignalEvent(events.Event)
}
32 changes: 32 additions & 0 deletions serve/encode/encode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package encode

import (
"encoding/base64"
"fmt"

"github.com/gnolang/gno/tm2/pkg/amino"
)

// EncodeValue encodes the given value into Amino binary, and then to base64.
//
// The optimal route for all responses served by this indexer implementation would
// be to use plain ol' JSON.
// JSON is nice.
// json.Marshal works.
//
// However, this doesn't work with some (any, really) TM2 types (block, tx...),
// which means a top-level Amino encoder needs to be present when serving the response.
// This opens the pandora's box for another problem: custom type registration.
// In Amino, any custom type needs to be registered using a clunky API, meaning all types need to be
// accounted for beforehand, which is not something that is worth the effort of doing,
// considering there are easier ways to pass around data (remember plain ol' JSON?).
// So, we arrive at this imperfect solution: TM2 types are encoded using Amino binary, and then using base64,
// and as such are passed to the client for processing
func EncodeValue(value any) (string, error) {

Check warning on line 25 in serve/encode/encode.go

View workflow job for this annotation

GitHub Actions / Go Linter / lint

exported: func name will be used as encode.EncodeValue by other packages, and that stutters; consider calling this Value (revive)
aminoEncoding, err := amino.Marshal(value)
if err != nil {
return "", fmt.Errorf("unable to amino encode value")
}

return base64.StdEncoding.EncodeToString(aminoEncoding), nil
}
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,43 +1,47 @@
package filter

import "github.com/gnolang/tx-indexer/types"
import (
"encoding/base64"

"github.com/gnolang/gno/tm2/pkg/bft/types"
)

// BlockFilter type of filter for querying blocks
type BlockFilter struct {
*baseFilter

blockHashes [][]byte // TODO keep as strings in 0x format?
blockHashes []string
}

// NewBlockFilter creates new block filter object
func NewBlockFilter() *BlockFilter {
return &BlockFilter{
baseFilter: newBaseFilter(BlockFilterType),
blockHashes: make([][]byte, 0),
blockHashes: make([]string, 0),
}
}

// GetChanges returns all new blocks from last query
// GetChanges returns all new block headers from the last query
func (b *BlockFilter) GetChanges() any {
b.RLock()
defer b.RUnlock()

// Get hashes
hashes := b.blockHashes

// Empty hashes
// Empty headers
b.blockHashes = b.blockHashes[:0]

return hashes
}

func (b *BlockFilter) UpdateWithBlock(block types.Block) {
func (b *BlockFilter) UpdateWithBlock(block *types.Block) {
b.Lock()
defer b.Unlock()

// Fetch block hash
// Get the block hash
hash := block.Hash()

// Add hash into block hash array
b.blockHashes = append(b.blockHashes, hash)
b.blockHashes = append(b.blockHashes, base64.StdEncoding.EncodeToString(hash))
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
26 changes: 10 additions & 16 deletions serve/handlers/subs/filters/manager.go → serve/filters/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@ package filters

import (
"context"
"errors"
"fmt"
"time"

"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/serve/conns"
"github.com/gnolang/tx-indexer/serve/handlers/subs/filters/filter"
filterSubscription "github.com/gnolang/tx-indexer/serve/handlers/subs/filters/subscription"
"github.com/gnolang/tx-indexer/types"
"go.uber.org/zap"
"github.com/gnolang/tx-indexer/serve/filters/filter"
filterSubscription "github.com/gnolang/tx-indexer/serve/filters/subscription"
commonTypes "github.com/gnolang/tx-indexer/types"
)

var ErrFilterNotFound = errors.New("filter not found")

// Manager manages all running filters
type Manager struct {
logger *zap.Logger

ctx context.Context
state Storage
storage Storage
events Events
filters *filterMap
subscriptions *subscriptionMap
Expand All @@ -31,14 +26,13 @@ type Manager struct {
// NewFilterManager creates new filter manager object
func NewFilterManager(
ctx context.Context,
state Storage,
storage Storage,
events Events,
opts ...Option,
) *Manager {
filterManager := &Manager{
ctx: ctx,
logger: zap.NewNop(),
state: state,
storage: storage,
events: events,
filters: newFilterMap(),
subscriptions: newSubMap(),
Expand Down Expand Up @@ -90,7 +84,7 @@ func (f *Manager) UninstallSubscription(id string) bool {

// subscribeToNewBlockEvent subscribes to new block events
func (f *Manager) subscribeToNewBlockEvent() {
blockSub := f.events.Subscribe([]events.Type{types.NewBlockEvent})
blockSub := f.events.Subscribe([]events.Type{commonTypes.NewBlockEvent})
defer f.events.CancelSubscription(blockSub.ID)

for {
Expand All @@ -106,7 +100,7 @@ func (f *Manager) subscribeToNewBlockEvent() {
// cannot be executed in parallel (go routines)
// because data sequencing should be persisted
// (info about block X comes before info on block X + 1)
newBlock := blockRaw.(*types.NewBlock)
newBlock := blockRaw.(*commonTypes.NewBlock)

// Apply block to filters
f.updateFiltersWithBlock(newBlock.Block)
Expand All @@ -118,7 +112,7 @@ func (f *Manager) subscribeToNewBlockEvent() {
}

// updateFiltersWithBlock updates all filters with the incoming block
func (f *Manager) updateFiltersWithBlock(block types.Block) {
func (f *Manager) updateFiltersWithBlock(block *types.Block) {
f.filters.rangeItems(func(filter Filter) {
filter.UpdateWithBlock(block)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"time"

"github.com/gnolang/tx-indexer/events"
"github.com/gnolang/tx-indexer/serve/handlers/subs/filters/filter"
"github.com/gnolang/tx-indexer/serve/handlers/subs/filters/mocks"
"github.com/gnolang/tx-indexer/serve/filters/filter"
"github.com/gnolang/tx-indexer/serve/filters/mocks"
"github.com/gnolang/tx-indexer/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

// generateTestHashes generates dummy test hashes
Expand All @@ -37,7 +36,6 @@ func Test_BlockFilters(t *testing.T) {
context.Background(),
&mocks.MockStorage{},
events.NewManager(),
WithLogger(zap.NewNop()),
)

// Create block filter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"github.com/gnolang/tx-indexer/events"
)

type hashDelegate func() []byte
type (
hashDelegate func() []byte
headerDelegate func() *tm2Types.Header
)

type MockBlock struct {
HashFn hashDelegate
HashFn hashDelegate
HeaderFn headerDelegate
}

func (m *MockBlock) Hash() []byte {
Expand All @@ -21,6 +25,14 @@ func (m *MockBlock) Hash() []byte {
return nil
}

func (m *MockBlock) Header() *tm2Types.Header {
if m.HeaderFn != nil {
return m.HeaderFn()
}

return nil
}

type (
writeDataFnDelegate func(any) error
)
Expand Down
Loading

0 comments on commit 538ba7b

Please sign in to comment.