diff --git a/.gitignore b/.gitignore index e973c33e..eca204c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,9 @@ # MacOS Leftovers .DS_Store +# Data Leftovers +indexer-db/* + # Editor Leftovers .vscode .idea diff --git a/client/http.go b/client/http.go new file mode 100644 index 00000000..28e953c1 --- /dev/null +++ b/client/http.go @@ -0,0 +1,47 @@ +package client + +import ( + "fmt" + + rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client" + core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" +) + +// Client is the TM2 HTTP client +type Client struct { + client rpcClient.Client +} + +// NewClient creates a new TM2 HTTP client +func NewClient(remote string) *Client { + return &Client{ + client: rpcClient.NewHTTP(remote, ""), + } +} + +func (c *Client) GetLatestBlockNumber() (int64, error) { + status, err := c.client.Status() + if err != nil { + return 0, fmt.Errorf("unable to get chain status, %w", err) + } + + return status.SyncInfo.LatestBlockHeight, nil +} + +func (c *Client) GetBlock(blockNum int64) (*core_types.ResultBlock, error) { + block, err := c.client.Block(&blockNum) + if err != nil { + return nil, fmt.Errorf("unable to get block, %w", err) + } + + return block, nil +} + +func (c *Client) GetBlockResults(blockNum int64) (*core_types.ResultBlockResults, error) { + results, err := c.client.BlockResults(&blockNum) + if err != nil { + return nil, fmt.Errorf("unable to get block results, %w", err) + } + + return results, nil +} diff --git a/cmd/start.go b/cmd/start.go index d5ceec06..ba64f82a 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -3,11 +3,26 @@ package main import ( "context" "flag" + "fmt" + "github.com/gnolang/tx-indexer/client" + "github.com/gnolang/tx-indexer/fetch" + "github.com/gnolang/tx-indexer/serve" + "github.com/gnolang/tx-indexer/storage" "github.com/peterbourgon/ff/v3/ffcli" + "go.uber.org/zap" ) -type startCfg struct{} +const ( + defaultRemote = "http://127.0.0.1:26657" + defaultDBPath = "indexer-db" +) + +type startCfg struct { + listenAddress string + remote string + dbPath string +} // newStartCmd creates the indexer start command func newStartCmd() *ffcli.Command { @@ -19,19 +34,87 @@ func newStartCmd() *ffcli.Command { return &ffcli.Command{ Name: "start", ShortUsage: "start [flags]", - LongHelp: "Starts the indexer", + ShortHelp: "Starts the indexer service", + LongHelp: "Starts the indexer service, which includes the fetcher and JSON-RPC server", FlagSet: fs, - Exec: cfg.exec, + Exec: func(ctx context.Context, _ []string) error { + return cfg.exec(ctx) + }, } } // registerFlags registers the indexer start command flags -func (c *startCfg) registerFlags(_ *flag.FlagSet) { - // TODO define flags +func (c *startCfg) registerFlags(fs *flag.FlagSet) { + fs.StringVar( + &c.listenAddress, + "listen-address", + serve.DefaultListenAddress, + "the IP:PORT URL for the indexer JSON-RPC server", + ) + + fs.StringVar( + &c.remote, + "remote", + defaultRemote, + "the JSON-RPC URL of the Gno chain", + ) + + fs.StringVar( + &c.dbPath, + "db-path", + defaultDBPath, + "the absolute path for the indexer DB (embedded)", + ) } // exec executes the indexer start command -func (c *startCfg) exec(_ context.Context, _ []string) error { - // TODO add implementation - return nil +func (c *startCfg) exec(ctx context.Context) error { + // Create a new logger + logger, err := zap.NewDevelopment() + if err != nil { + return fmt.Errorf("unable to create logger, %w", err) + } + + // Create a DB instance + db, err := storage.New(c.dbPath) + if err != nil { + return fmt.Errorf("unable to open storage DB, %w", err) + } + + defer func() { + if err := db.Close(); err != nil { + logger.Error("unable to gracefully close DB", zap.Error(err)) + } + }() + + // Create the fetcher instance + f := fetch.New( + db, + client.NewClient(c.remote), + fetch.WithLogger( + logger.Named("fetcher"), + ), + ) + + // Create the JSON-RPC service + j := serve.NewJSONRPC( + serve.WithLogger( + logger.Named("json-rpc"), + ), + serve.WithListenAddress( + c.listenAddress, + ), + ) + + // Create a new waiter + w := newWaiter(ctx) + + // Add the fetcher service + w.add(f.FetchTransactions) + + // Add the JSON-RPC service + w.add(j.Serve) + + // Wait for the services to stop + return w.wait() } diff --git a/cmd/waiter.go b/cmd/waiter.go new file mode 100644 index 00000000..e70ae10b --- /dev/null +++ b/cmd/waiter.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + + "golang.org/x/sync/errgroup" +) + +type waitFunc func(ctx context.Context) error + +// waiter is a concept used for waiting on running services +type waiter struct { + ctx context.Context + cancel context.CancelFunc + + waitFns []waitFunc +} + +// newWaiter creates a new waiter instance +func newWaiter(ctx context.Context) *waiter { + w := &waiter{ + waitFns: []waitFunc{}, + } + + w.ctx, w.cancel = signal.NotifyContext( + ctx, + os.Interrupt, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT, + ) + + return w +} + +// add adds a new wait service +func (w *waiter) add(fns ...waitFunc) { + w.waitFns = append(w.waitFns, fns...) +} + +// wait blocks until all added wait services finish +func (w *waiter) wait() error { + g, ctx := errgroup.WithContext(w.ctx) + + g.Go(func() error { + <-ctx.Done() + w.cancel() + + return nil + }) + + for _, fn := range w.waitFns { + fn := fn + + g.Go( + func() error { + return fn(ctx) + }, + ) + } + + return g.Wait() +} diff --git a/fetch/fetch.go b/fetch/fetch.go index 3f418f51..0914b3e7 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -2,26 +2,34 @@ package fetch import ( "context" + "encoding/base64" "errors" "fmt" + "sort" "time" - "github.com/gnolang/gno/tm2/pkg/bft/types" storageErrors "github.com/gnolang/tx-indexer/storage/errors" + queue "github.com/madz-lab/insertion-queue" "go.uber.org/zap" ) +const ( + maxSlots = 100 + maxChunkSize = 50 +) + type Fetcher struct { storage Storage client Client logger *zap.Logger + chunkBuffer *slots queryInterval time.Duration // block query interval } -// NewFetcher creates a new transaction result fetcher instance -// that gets data from a remote chain -func NewFetcher( +// New creates a new data fetcher instance +// that gets blockchain data from a remote chain +func New( storage Storage, client Client, opts ...Option, @@ -31,6 +39,7 @@ func NewFetcher( client: client, queryInterval: 1 * time.Second, logger: zap.NewNop(), + chunkBuffer: &slots{Queue: make([]queue.Item, 0), maxSlots: maxSlots}, } for _, opt := range opts { @@ -40,109 +49,170 @@ func NewFetcher( return f } -// FetchTransactions runs the transaction fetcher [BLOCKING] func (f *Fetcher) FetchTransactions(ctx context.Context) error { - // catchupWithChain syncs any transactions that have occurred - // between the local last block (in storage) and the chain state (latest head) - catchupWithChain := func(lastBlock int64) (int64, error) { + defer func() { + f.logger.Info("Fetcher service shut down") + }() + + collectorCh := make(chan *workerResponse, 10) + + callback := func() error { + // Check if there are any free slots + if f.chunkBuffer.Len() == maxSlots { + // Currently no free slot exists + return nil + } + + // Fetch the latest saved height + latestLocal, err := f.storage.GetLatestHeight() + if err != nil && !errors.Is(err, storageErrors.ErrNotFound) { + return fmt.Errorf("unable to fetch latest block height, %w", err) + } + // Fetch the latest block from the chain latest, latestErr := f.client.GetLatestBlockNumber() if latestErr != nil { - return 0, fmt.Errorf("unable to fetch latest block number, %w", latestErr) + f.logger.Error("unable to fetch latest block number", zap.Error(latestErr)) + + return nil } // Check if there is a block gap - if lastBlock == latest { + if latest <= latestLocal { // No gap, nothing to sync - return latest, nil + return nil } - // Catch up to the latest block - for block := lastBlock + 1; block <= latest; block++ { - if fetchErr := f.fetchAndSaveBlockData(block); fetchErr != nil { - return 0, fetchErr - } - } + gaps := f.chunkBuffer.reserveChunkRanges( + latestLocal+1, + latest, + maxChunkSize, + ) - // Return the latest available block - return latest, nil - } + for _, gap := range gaps { + f.logger.Info( + "Fetching range", + zap.Int64("from", gap.from), + zap.Int64("to", gap.to), + ) + + // Spawn worker + info := &workerInfo{ + chunkRange: gap, + resCh: collectorCh, + } - // Fetch the latest tx from storage - currentHeight, err := f.storage.GetLatestHeight() - if err != nil && !errors.Is(err, storageErrors.ErrNotFound) { - return fmt.Errorf("unable to fetch latest transaction, %w", err) - } + go handleChunk(ctx, f.client, info) + } - // "Catch up" initially with the chain - if currentHeight, err = catchupWithChain(currentHeight); err != nil { - return err + return nil } // Start a listener for monitoring new blocks ticker := time.NewTicker(f.queryInterval) defer ticker.Stop() + if err := callback(); err != nil { + return err + } + for { select { case <-ctx.Done(): - f.logger.Info("Stopping fetch service...") - return nil case <-ticker.C: - if currentHeight, err = catchupWithChain(currentHeight); err != nil { + if err := callback(); err != nil { return err } - } - } -} + case response := <-collectorCh: + pruneSlot := func(index int) { + // Prune the element + f.chunkBuffer.Queue = append( + f.chunkBuffer.Queue[:index], + f.chunkBuffer.Queue[index+1:]..., + ) + + // Fix the queue + f.chunkBuffer.Fix() + } -// fetchAndSaveBlockData commits the block data to storage -func (f *Fetcher) fetchAndSaveBlockData(blockNum int64) error { - // Get block info from the chain - block, err := f.client.GetBlock(blockNum) - if err != nil { - return fmt.Errorf("unable to fetch block, %w", err) - } + // Find the slot index + index := sort.Search(f.chunkBuffer.Len(), func(i int) bool { + return f.chunkBuffer.getSlot(i).chunkRange.from >= response.chunkRange.from + }) - if saveErr := f.storage.SaveBlock(block.Block); saveErr != nil { - return fmt.Errorf("unable to save block, %w", saveErr) - } + if response.error != nil { + f.logger.Error( + "error encountered during chunk fetch", + zap.Error(response.error), + ) - f.logger.Info("Saved block data", zap.Int64("number", block.Block.Height)) + // Prune the chunk as it is invalid + pruneSlot(index) - // Skip empty blocks - if block.Block.NumTxs == 0 { - f.logger.Debug("Block is empty", zap.Int64("number", block.Block.Height)) + continue + } - return nil - } + // Save the chunk + f.chunkBuffer.setChunk(index, response.chunk) - // Get the transaction execution results - txResults, err := f.client.GetBlockResults(blockNum) - if err != nil { - return fmt.Errorf("unable to fetch block results, %w", err) - } + // Fetch the latest store sequence + latestLocal, err := f.storage.GetLatestHeight() + if err != nil && !errors.Is(err, storageErrors.ErrNotFound) { + return fmt.Errorf("unable to fetch latest block height, %w", err) + } - // Save the transaction result to the storage - for index, tx := range block.Block.Txs { - result := &types.TxResult{ - Height: block.Block.Height, - Index: uint32(index), - Tx: tx, - Response: txResults.Results.DeliverTxs[index], - } + for f.chunkBuffer.Len() > 0 { + item := f.chunkBuffer.getSlot(0) - if err := f.storage.SaveTx(result); err != nil { - return fmt.Errorf("unable to save tx, %w", err) - } + isConsecutive := item.chunkRange.from == latestLocal+1 + isFetched := item.chunk != nil - f.logger.Info( - "Saved block tx", - zap.Int64("number", block.Block.Height), - zap.String("hash", string(tx.Hash())), - ) - } + if !isConsecutive || !isFetched { + break + } + + // Pop the next chunk + f.chunkBuffer.PopFront() + + // Save the fetched data + for _, block := range item.chunk.blocks { + if saveErr := f.storage.SaveBlock(block); saveErr != nil { + // This is a design choice that really highlights the strain + // of keeping legacy testnets running. Current TM2 testnets + // have blocks / transactions that are no longer compatible + // with latest "master" changes for Amino, so these blocks / txs are ignored, + // as opposed to this error being a show-stopper for the fetcher + f.logger.Error("unable to save block", zap.String("err", err.Error())) + + continue + } + + f.logger.Info("Saved block data", zap.Int64("number", block.Height)) + } + + for _, results := range item.chunk.results { + for _, tx := range results { + if err := f.storage.SaveTx(tx); err != nil { + f.logger.Error("unable to save tx", zap.String("err", err.Error())) - return nil + continue + } + + f.logger.Info( + "Saved tx", + zap.String("hash", base64.StdEncoding.EncodeToString(tx.Tx.Hash())), + ) + } + } + + latestLocal = item.chunkRange.to + + // Save the latest height data + if err := f.storage.SaveLatestHeight(latestLocal); err != nil { + return fmt.Errorf("unable to save latest height info, %w", err) + } + } + } + } } diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index ed944e35..fde00668 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -35,7 +35,7 @@ func TestNodeFetcher_FetchTransactions_Invalid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, &mockClient{}, WithLogger(zap.NewNop())) + f := New(mockStorage, &mockClient{}, WithLogger(zap.NewNop())) assert.ErrorIs( t, @@ -64,7 +64,7 @@ func TestNodeFetcher_FetchTransactions_Invalid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, mockClient) + f := New(mockStorage, mockClient) assert.ErrorIs( t, @@ -100,7 +100,7 @@ func TestNodeFetcher_FetchTransactions_Invalid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, mockClient) + f := New(mockStorage, mockClient) assert.ErrorIs( t, @@ -149,7 +149,7 @@ func TestNodeFetcher_FetchTransactions_Invalid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, mockClient) + f := New(mockStorage, mockClient) assert.ErrorIs( t, @@ -230,7 +230,7 @@ func TestNodeFetcher_FetchTransactions_Valid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, mockClient) + f := New(mockStorage, mockClient) // Create the context ctx, cancelFn := context.WithCancel(context.Background()) @@ -314,7 +314,7 @@ func TestNodeFetcher_FetchTransactions_Valid(t *testing.T) { ) // Create the fetcher - f := NewFetcher(mockStorage, mockClient) + f := New(mockStorage, mockClient) // Create the context ctx, cancelFn := context.WithCancel(context.Background()) diff --git a/fetch/mocks_test.go b/fetch/mocks_test.go index 21922e20..86af3842 100644 --- a/fetch/mocks_test.go +++ b/fetch/mocks_test.go @@ -6,13 +6,15 @@ import ( ) type ( - getLatestSavedHeightDelegate func() (int64, error) - saveBlockDelegate func(*types.Block) error - saveTxDelegate func(*types.TxResult) error + getLatestHeightDelegate func() (int64, error) + saveLatestHeightDelegate func(int64) error + saveBlockDelegate func(*types.Block) error + saveTxDelegate func(*types.TxResult) error ) type mockStorage struct { - getLatestSavedHeightFn getLatestSavedHeightDelegate + getLatestSavedHeightFn getLatestHeightDelegate + saveLatestHeightFn saveLatestHeightDelegate saveBlockFn saveBlockDelegate saveTxFn saveTxDelegate } @@ -25,6 +27,14 @@ func (m *mockStorage) GetLatestHeight() (int64, error) { return 0, nil } +func (m *mockStorage) SaveLatestHeight(blockNum int64) error { + if m.saveLatestHeightFn != nil { + return m.saveLatestHeightFn(blockNum) + } + + return nil +} + func (m *mockStorage) SaveTx(tx *types.TxResult) error { if m.saveTxFn != nil { return m.saveTxFn(tx) diff --git a/fetch/slots.go b/fetch/slots.go new file mode 100644 index 00000000..8043b30e --- /dev/null +++ b/fetch/slots.go @@ -0,0 +1,141 @@ +package fetch + +import ( + "github.com/gnolang/gno/tm2/pkg/bft/types" + queue "github.com/madz-lab/insertion-queue" +) + +type chunk struct { + blocks []*types.Block + results [][]*types.TxResult +} + +// slot is a single chunk slot +type slot struct { + chunk *chunk // retrieved data chunk + chunkRange chunkRange // retrieved data chunk range +} + +func (s *slot) Less(i queue.Item) bool { + other, ok := i.(*slot) + if !ok { + return false + } + + return s.chunkRange.less(other.chunkRange) +} + +// chunkRange is the data sequence range for the data +type chunkRange struct { + from int64 // sequence from (inclusive) + to int64 // sequence to (inclusive) +} + +// less returns a flag indicating if the current chunk range is less than the other +func (c chunkRange) less(other chunkRange) bool { + return c.from < other.from +} + +// slots is the fixed priority-queue slot representation +type slots struct { + queue.Queue + + maxSlots int +} + +// getSlot fetches the slot at the specific index +func (s *slots) getSlot(index int) *slot { + if s.Len()-1 < index { + return nil + } + + return s.Index(index).(*slot) +} + +// setChunk sets the chunk for the specified index +func (s *slots) setChunk(index int, chunk *chunk) { + item := s.getSlot(index) + item.chunk = chunk + + s.Queue[index] = item +} + +// reserveChunkRanges reserves empty chunk ranges, and returns them, if any +func (s *slots) reserveChunkRanges(start, end, maxChunkSize int64) []chunkRange { + freeSlots := s.maxSlots - s.Len() + + gaps := s.findGaps(start, end, maxChunkSize) + maxRanges := min(len(gaps), freeSlots) + chunkRanges := make([]chunkRange, maxRanges) + + for index, gap := range gaps[:maxRanges] { + chunkRanges[index] = gap + + s.Push(&slot{ + chunk: nil, + chunkRange: gap, + }) + } + + return chunkRanges +} + +// findGaps finds the chunk gaps in the specified range +func (s *slots) findGaps(start, end, maxSize int64) []chunkRange { + var ( + chunkRanges []chunkRange // contains all gaps + dividedGaps []chunkRange // contains at most maxSize gaps + ) + + if s.Len() == 0 { + chunkRanges = append(chunkRanges, chunkRange{ + from: start, + to: end, + }) + } else { + prevTo := start - 1 + + for _, partRaw := range s.Queue { + part := partRaw.(*slot) + + if part.chunkRange.from > prevTo+1 { + chunkRanges = append(chunkRanges, chunkRange{ + from: prevTo + 1, + to: part.chunkRange.from - 1, + }) + } + + prevTo = part.chunkRange.to + } + + if prevTo < end { + chunkRanges = append(chunkRanges, chunkRange{ + from: prevTo + 1, + to: end, + }) + } + } + + for _, gap := range chunkRanges { + if gap.to-gap.from+1 <= maxSize { + dividedGaps = append(dividedGaps, gap) + + continue + } + + for i := gap.from; i <= gap.to; i += maxSize { + newGap := chunkRange{ + from: i, + to: i + maxSize - 1, + } + + if newGap.to > gap.to { + newGap.to = gap.to + } + + dividedGaps = append(dividedGaps, newGap) + } + } + + return dividedGaps +} diff --git a/fetch/types.go b/fetch/types.go index 9f8283c3..858fd4a9 100644 --- a/fetch/types.go +++ b/fetch/types.go @@ -7,9 +7,12 @@ import ( // Storage defines the transaction storage interface type Storage interface { - // GetLatestSavedHeight returns the latest block height from the storage + // GetLatestHeight returns the latest block height from the storage GetLatestHeight() (int64, error) + // SaveLatestHeight saves the latest block height to the storage + SaveLatestHeight(int64) error + // SaveBlock saves the block to the permanent storage SaveBlock(block *types.Block) error diff --git a/fetch/worker.go b/fetch/worker.go new file mode 100644 index 00000000..f790b99a --- /dev/null +++ b/fetch/worker.go @@ -0,0 +1,78 @@ +package fetch + +import ( + "context" + + "github.com/gnolang/gno/tm2/pkg/bft/types" +) + +// workerInfo is the work context for the fetch routine +type workerInfo struct { + resCh chan<- *workerResponse // response channel + chunkRange chunkRange // data range +} + +// workerResponse is the routine response +type workerResponse struct { + error error // encountered error, if any + chunk *chunk // the fetched chunk + chunkRange chunkRange // the fetched chunk range +} + +// handleChunk fetches the chunk from the client +func handleChunk( + ctx context.Context, + client Client, + info *workerInfo, +) { + var ( + err error + + c = &chunk{ + blocks: make([]*types.Block, 0), + results: make([][]*types.TxResult, 0), + } + ) + + for blockNum := info.chunkRange.from; blockNum <= info.chunkRange.to; blockNum++ { + // Get block info from the chain + block, getErr := client.GetBlock(blockNum) + if getErr != nil { + break + } + + results := make([]*types.TxResult, block.Block.NumTxs) + + if block.Block.NumTxs != 0 { + // Get the transaction execution results + txResults, resErr := client.GetBlockResults(blockNum) + if resErr != nil { + break + } + + // Save the transaction result to the storage + for index, tx := range block.Block.Txs { + results[index] = &types.TxResult{ + Height: block.Block.Height, + Index: uint32(index), + Tx: tx, + Response: txResults.Results.DeliverTxs[index], + } + } + } + + c.blocks = append(c.blocks, block.Block) + c.results = append(c.results, results) + } + + response := &workerResponse{ + error: err, + chunk: c, + chunkRange: info.chunkRange, + } + + select { + case <-ctx.Done(): + case info.resCh <- response: + } +} diff --git a/go.mod b/go.mod index ca547997..a0f3bc07 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,12 @@ require ( github.com/cockroachdb/pebble v1.0.0 github.com/go-chi/chi/v5 v5.0.11 github.com/google/uuid v1.5.0 + github.com/madz-lab/insertion-queue v0.0.0-20230520191346-295d3348f63a github.com/olahol/melody v1.1.4 github.com/peterbourgon/ff/v3 v3.4.0 github.com/stretchr/testify v1.8.4 go.uber.org/zap v1.26.0 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 ) require ( @@ -40,12 +42,14 @@ require ( github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/linxGnu/grocksdb v1.8.5 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/pelletier/go-toml v1.9.5 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.12.0 // indirect github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/rs/cors v1.10.1 // indirect github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect go.etcd.io/bbolt v1.3.8 // indirect go.opencensus.io v0.22.5 // indirect diff --git a/go.sum b/go.sum index 4fdeb390..e4b7eb60 100644 --- a/go.sum +++ b/go.sum @@ -325,6 +325,8 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/linxGnu/grocksdb v1.8.5 h1:Okfk5B1h0ikCYdDM7Tc5yJUS8LTwAmMBq5IPWTmOLPs= github.com/linxGnu/grocksdb v1.8.5/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= +github.com/madz-lab/insertion-queue v0.0.0-20230520191346-295d3348f63a h1:KxTVE11SAJzp+PnqaCw0Rzb/of6mQexpTIyZwM/JTJU= +github.com/madz-lab/insertion-queue v0.0.0-20230520191346-295d3348f63a/go.mod h1:kWWMMyVnsC79rIkENl7FQUU2EQql12s8ETwjsDBiMtA= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= @@ -403,6 +405,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo= +github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= diff --git a/serve/handler_test.go b/serve/handler_test.go index be7ce162..25849417 100644 --- a/serve/handler_test.go +++ b/serve/handler_test.go @@ -161,7 +161,7 @@ func newWebServer(t *testing.T, callbacks ...func(s *JSONRPC)) *testWebServer { } // Hook up the JSON-RPC server to the mux - mux.Mount("/", s.SetupRouter()) + mux.Mount("/", s.setupRouter()) return webServer } diff --git a/serve/jsonrpc.go b/serve/jsonrpc.go index eba178b4..195a50ee 100644 --- a/serve/jsonrpc.go +++ b/serve/jsonrpc.go @@ -1,9 +1,13 @@ package serve import ( + "context" "encoding/json" + "errors" "io" + "net" "net/http" + "time" "github.com/gnolang/tx-indexer/serve/conns" "github.com/gnolang/tx-indexer/serve/conns/wsconn" @@ -17,6 +21,7 @@ import ( "github.com/google/uuid" "github.com/olahol/melody" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -29,6 +34,10 @@ const ( wsIDKey = "ws-id" ) +const ( + DefaultListenAddress = "0.0.0.0:8545" +) + // maxSizeMiddleware enforces a 1MB size limit on the request body func maxSizeMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -41,6 +50,10 @@ func maxSizeMiddleware(next http.Handler) http.Handler { // JSONRPC is the JSONRPC server instance, that is capable // of handling both HTTP and WS requests type JSONRPC struct { + // wsConns keeps track of WS connections + // that need to be directly accessed by certain methods + wsConns conns.ConnectionManager + logger *zap.Logger // handlers are the registered method handlers @@ -49,17 +62,17 @@ type JSONRPC struct { // ws handles incoming and active WS connections ws *melody.Melody - // wsConns keeps track of WS connections - // that need to be directly accessed by certain methods - wsConns conns.ConnectionManager + // listenAddress is the serve address + listenAddress string } // NewJSONRPC creates a new instance of the JSONRPC server func NewJSONRPC(opts ...Option) *JSONRPC { j := &JSONRPC{ - logger: zap.NewNop(), - handlers: newHandlers(), - ws: melody.New(), + logger: zap.NewNop(), + handlers: newHandlers(), + ws: melody.New(), + listenAddress: DefaultListenAddress, } for _, opt := range opts { @@ -75,8 +88,52 @@ func NewJSONRPC(opts ...Option) *JSONRPC { return j } -// SetupRouter sets up the request router for the indexer service -func (j *JSONRPC) SetupRouter() *chi.Mux { +// Serve serves the JSON-RPC server +func (j *JSONRPC) Serve(ctx context.Context) error { + faucet := &http.Server{ + Addr: j.listenAddress, + Handler: j.setupRouter(), + ReadHeaderTimeout: 60 * time.Second, + } + + group, gCtx := errgroup.WithContext(ctx) + + group.Go(func() error { + defer j.logger.Info("JSON-RPC server shut down") + + ln, err := net.Listen("tcp", faucet.Addr) + if err != nil { + return err + } + + j.logger.Info( + "JSON-RPC server started", + zap.String("address", ln.Addr().String()), + ) + + if err := faucet.Serve(ln); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + + return nil + }) + + group.Go(func() error { + <-gCtx.Done() + + j.logger.Info("JSON-RPC server to be shut down") + + wsCtx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + return faucet.Shutdown(wsCtx) + }) + + return group.Wait() +} + +// setupRouter sets up the request router for the indexer service +func (j *JSONRPC) setupRouter() *chi.Mux { mux := chi.NewRouter() // Set up the middlewares diff --git a/serve/options.go b/serve/options.go index 043cb3a9..9ec437be 100644 --- a/serve/options.go +++ b/serve/options.go @@ -11,3 +11,11 @@ func WithLogger(logger *zap.Logger) Option { s.logger = logger } } + +// WithListenAddress sets the listen address +// for the JSON-RPC server +func WithListenAddress(address string) Option { + return func(s *JSONRPC) { + s.listenAddress = address + } +}