Skip to content

Commit

Permalink
Sleep when no logs (#110)
Browse files Browse the repository at this point in the history
## tl;dr

- Addresses PR feedback and has the indexer sleep for 1 second when no logs are received. 

## Potential improvements

- Start with a smaller sleep time and use exponential backoff
  • Loading branch information
neekolas authored Aug 13, 2024
2 parents 4621c3e + ba98985 commit 3bffba4
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions pkg/indexer/blockchain/rpcLogStreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
// Setting to 0 since we are talking about L2s with low reorg risk
LAG_FROM_HIGHEST_BLOCK = 0
ERROR_SLEEP_TIME = 100 * time.Millisecond
NO_LOGS_SLEEP_TIME = 1 * time.Second
)

// The builder that allows you to configure contract events to listen for
Expand All @@ -32,9 +33,16 @@ func NewRpcLogStreamBuilder(rpcUrl string, logger *zap.Logger) *RpcLogStreamBuil
return &RpcLogStreamBuilder{rpcUrl: rpcUrl, logger: logger}
}

func (c *RpcLogStreamBuilder) ListenForContractEvent(fromBlock int, contractAddress common.Address, topics []common.Hash) <-chan types.Log {
func (c *RpcLogStreamBuilder) ListenForContractEvent(
fromBlock int,
contractAddress common.Address,
topics []common.Hash,
) <-chan types.Log {
eventChannel := make(chan types.Log, 100)
c.contractConfigs = append(c.contractConfigs, contractConfig{fromBlock, contractAddress, topics, eventChannel})
c.contractConfigs = append(
c.contractConfigs,
contractConfig{fromBlock, contractAddress, topics, eventChannel},
)
return eventChannel
}

Expand Down Expand Up @@ -68,7 +76,11 @@ type RpcLogStreamer struct {
logger *zap.Logger
}

func NewRpcLogStreamer(client ChainClient, logger *zap.Logger, watchers []contractConfig) *RpcLogStreamer {
func NewRpcLogStreamer(
client ChainClient,
logger *zap.Logger,
watchers []contractConfig,
) *RpcLogStreamer {
return &RpcLogStreamer{
client: client,
watchers: watchers,
Expand Down Expand Up @@ -97,12 +109,19 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
default:
logs, nextBlock, err := r.getNextPage(watcher, fromBlock)
if err != nil {
logger.Error("Error getting next page", zap.Int("fromBlock", fromBlock), zap.Error(err))
logger.Error(
"Error getting next page",
zap.Int("fromBlock", fromBlock),
zap.Error(err),
)
time.Sleep(ERROR_SLEEP_TIME)
continue
}

logger.Info("Got logs", zap.Int("numLogs", len(logs)), zap.Int("fromBlock", fromBlock))
if len(logs) == 0 {
time.Sleep(NO_LOGS_SLEEP_TIME)
}
for _, log := range logs {
watcher.channel <- log
}
Expand All @@ -113,7 +132,10 @@ func (r *RpcLogStreamer) watchContract(watcher contractConfig) {
}
}

func (r *RpcLogStreamer) getNextPage(config contractConfig, fromBlock int) (logs []types.Log, nextBlock *int, err error) {
func (r *RpcLogStreamer) getNextPage(
config contractConfig,
fromBlock int,
) (logs []types.Log, nextBlock *int, err error) {
highestBlock, err := r.client.BlockNumber(r.ctx)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -144,7 +166,11 @@ func (r *RpcLogStreamer) getNextPage(config contractConfig, fromBlock int) (logs
return logs, &nextBlockNumber, nil
}

func buildFilterQuery(contractConfig contractConfig, fromBlock int64, toBlock int64) ethereum.FilterQuery {
func buildFilterQuery(
contractConfig contractConfig,
fromBlock int64,
toBlock int64,
) ethereum.FilterQuery {
addresses := []common.Address{contractConfig.contractAddress}
topics := [][]common.Hash{}
for _, topic := range contractConfig.topics {
Expand Down

0 comments on commit 3bffba4

Please sign in to comment.