Skip to content

Commit

Permalink
Replace fmt.Println with a proper logger
Browse files Browse the repository at this point in the history
Also add a comment on the starting height to be used for the indexer
  • Loading branch information
m-Peter committed Jan 6, 2024
1 parent ed60231 commit f55dc7b
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"flag"
"fmt"
"log"
"runtime"

goGrpc "google.golang.org/grpc"
Expand Down Expand Up @@ -45,13 +44,14 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

runServer(config, store)
runIndexer(ctx, store)
logger := zerolog.New(zerolog.NewConsoleWriter()).With().Timestamp().Logger()
runServer(config, store, logger)
runIndexer(ctx, store, logger)

runtime.Goexit()
}

func runIndexer(ctx context.Context, store *storage.Store) {
func runIndexer(ctx context.Context, store *storage.Store, logger zerolog.Logger) {
flowClient, err := grpc.NewBaseClient(
accessURL,
goGrpc.WithTransportCredentials(insecure.NewCredentials()),
Expand All @@ -60,12 +60,16 @@ func runIndexer(ctx context.Context, store *storage.Store) {
panic(err)
}

// TODO(m-Peter) The starting height from which the indexer should
// begins, should either be retrieved from storage (latest height + 1),
// or should be specified through a command-line flag (when starting
// from scratch).
latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
panic(err)
}
fmt.Printf("Latest Block Height: %d\n", latestBlockHeader.Height)
fmt.Printf("Latest Block ID: %s\n", latestBlockHeader.ID)
logger.Info().Msgf("Latest Block Height: %d", latestBlockHeader.Height)
logger.Info().Msgf("Latest Block ID: %s", latestBlockHeader.ID)

data, errChan, initErr := flowClient.SubscribeEventsByBlockHeight(
ctx,
Expand All @@ -76,19 +80,19 @@ func runIndexer(ctx context.Context, store *storage.Store) {
grpc.WithHeartbeatInterval(1),
)
if initErr != nil {
log.Fatalf("could not subscribe to events: %v", initErr)
logger.Error().Msgf("could not subscribe to events: %v", initErr)
}

reconnect := func(height uint64) {
fmt.Printf("Reconnecting at block %d\n", height)
logger.Warn().Msgf("Reconnecting at block height: %d", height)

var err error
flowClient, err := grpc.NewBaseClient(
accessURL,
goGrpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
log.Fatalf("could not create flow client: %v", err)
logger.Error().Msgf("could not create flow client: %v", err)
}

data, errChan, initErr = flowClient.SubscribeEventsByBlockHeight(
Expand All @@ -100,7 +104,7 @@ func runIndexer(ctx context.Context, store *storage.Store) {
grpc.WithHeartbeatInterval(1),
)
if initErr != nil {
log.Fatalf("could not subscribe to events: %v", initErr)
logger.Error().Msgf("could not subscribe to events: %v", initErr)
}
}

Expand All @@ -117,23 +121,23 @@ func runIndexer(ctx context.Context, store *storage.Store) {
if ctx.Err() != nil {
return // graceful shutdown
}
fmt.Println("subscription closed - reconnecting")
logger.Error().Msg("subscription closed - reconnecting")
reconnect(lastHeight + 1)
continue
}

if response.Height != lastHeight+1 {
fmt.Printf("missed events response for block %d\n", lastHeight+1)
logger.Error().Msgf("missed events response for block %d", lastHeight+1)
reconnect(lastHeight)
continue
}

fmt.Printf("block %d %s:\n", response.Height, response.BlockID)
logger.Info().Msgf("block %d %s:", response.Height, response.BlockID)
if len(response.Events) > 0 {
store.StoreBlockHeight(ctx, response.Height)
}
for _, event := range response.Events {
fmt.Printf(" %s\n", event.Type)
logger.Info().Msgf(" %s", event.Type)
}

lastHeight = response.Height
Expand All @@ -148,22 +152,25 @@ func runIndexer(ctx context.Context, store *storage.Store) {
continue
}

fmt.Printf("~~~ ERROR: %s ~~~\n", err.Error())
logger.Error().Msgf("ERROR: %v", err)
reconnect(lastHeight + 1)
continue
}
}
}

func runServer(config *api.Config, store *storage.Store) {
srv := api.NewHTTPServer(zerolog.Logger{}, rpc.DefaultHTTPTimeouts)
func runServer(config *api.Config, store *storage.Store, logger zerolog.Logger) {
srv := api.NewHTTPServer(logger, rpc.DefaultHTTPTimeouts)
supportedAPIs := api.SupportedAPIs(config, store)

srv.EnableRPC(supportedAPIs)
srv.EnableWS(supportedAPIs)

srv.SetListenAddr("localhost", 8545)

err := srv.Start()
if err != nil {
panic(err)
}
fmt.Println("Server Started: ", srv.ListenAddr())
logger.Info().Msgf("Server Started: %s", srv.ListenAddr())
}

0 comments on commit f55dc7b

Please sign in to comment.