diff --git a/cmd/server/main.go b/cmd/server/main.go index 52d1b4d0..67c4eaec 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "log" "runtime" goGrpc "google.golang.org/grpc" @@ -45,13 +44,14 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - runServer(config, store) - runIndexer(ctx, store) + logger := zerolog.New(zerolog.NewConsoleWriter()).With().Timestamp().Logger() + runServer(config, store, logger) + runIndexer(ctx, store, logger) runtime.Goexit() } -func runIndexer(ctx context.Context, store *storage.Store) { +func runIndexer(ctx context.Context, store *storage.Store, logger zerolog.Logger) { flowClient, err := grpc.NewBaseClient( accessURL, goGrpc.WithTransportCredentials(insecure.NewCredentials()), @@ -60,12 +60,16 @@ func runIndexer(ctx context.Context, store *storage.Store) { panic(err) } + // TODO(m-Peter) The starting height from which the indexer should + // begins, should either be retrieved from storage (latest height + 1), + // or should be specified through a command-line flag (when starting + // from scratch). latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true) if err != nil { panic(err) } - fmt.Printf("Latest Block Height: %d\n", latestBlockHeader.Height) - fmt.Printf("Latest Block ID: %s\n", latestBlockHeader.ID) + logger.Info().Msgf("Latest Block Height: %d", latestBlockHeader.Height) + logger.Info().Msgf("Latest Block ID: %s", latestBlockHeader.ID) data, errChan, initErr := flowClient.SubscribeEventsByBlockHeight( ctx, @@ -76,11 +80,11 @@ func runIndexer(ctx context.Context, store *storage.Store) { grpc.WithHeartbeatInterval(1), ) if initErr != nil { - log.Fatalf("could not subscribe to events: %v", initErr) + logger.Error().Msgf("could not subscribe to events: %v", initErr) } reconnect := func(height uint64) { - fmt.Printf("Reconnecting at block %d\n", height) + logger.Warn().Msgf("Reconnecting at block height: %d", height) var err error flowClient, err := grpc.NewBaseClient( @@ -88,7 +92,7 @@ func runIndexer(ctx context.Context, store *storage.Store) { goGrpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - log.Fatalf("could not create flow client: %v", err) + logger.Error().Msgf("could not create flow client: %v", err) } data, errChan, initErr = flowClient.SubscribeEventsByBlockHeight( @@ -100,7 +104,7 @@ func runIndexer(ctx context.Context, store *storage.Store) { grpc.WithHeartbeatInterval(1), ) if initErr != nil { - log.Fatalf("could not subscribe to events: %v", initErr) + logger.Error().Msgf("could not subscribe to events: %v", initErr) } } @@ -117,23 +121,23 @@ func runIndexer(ctx context.Context, store *storage.Store) { if ctx.Err() != nil { return // graceful shutdown } - fmt.Println("subscription closed - reconnecting") + logger.Error().Msg("subscription closed - reconnecting") reconnect(lastHeight + 1) continue } if response.Height != lastHeight+1 { - fmt.Printf("missed events response for block %d\n", lastHeight+1) + logger.Error().Msgf("missed events response for block %d", lastHeight+1) reconnect(lastHeight) continue } - fmt.Printf("block %d %s:\n", response.Height, response.BlockID) + logger.Info().Msgf("block %d %s:", response.Height, response.BlockID) if len(response.Events) > 0 { store.StoreBlockHeight(ctx, response.Height) } for _, event := range response.Events { - fmt.Printf(" %s\n", event.Type) + logger.Info().Msgf(" %s", event.Type) } lastHeight = response.Height @@ -148,22 +152,25 @@ func runIndexer(ctx context.Context, store *storage.Store) { continue } - fmt.Printf("~~~ ERROR: %s ~~~\n", err.Error()) + logger.Error().Msgf("ERROR: %v", err) reconnect(lastHeight + 1) continue } } } -func runServer(config *api.Config, store *storage.Store) { - srv := api.NewHTTPServer(zerolog.Logger{}, rpc.DefaultHTTPTimeouts) +func runServer(config *api.Config, store *storage.Store, logger zerolog.Logger) { + srv := api.NewHTTPServer(logger, rpc.DefaultHTTPTimeouts) supportedAPIs := api.SupportedAPIs(config, store) + srv.EnableRPC(supportedAPIs) srv.EnableWS(supportedAPIs) + srv.SetListenAddr("localhost", 8545) + err := srv.Start() if err != nil { panic(err) } - fmt.Println("Server Started: ", srv.ListenAddr()) + logger.Info().Msgf("Server Started: %s", srv.ListenAddr()) }