diff --git a/pkg/analyzer/chain_analyzer.go b/pkg/analyzer/chain_analyzer.go index 0339646..2798be1 100644 --- a/pkg/analyzer/chain_analyzer.go +++ b/pkg/analyzer/chain_analyzer.go @@ -80,7 +80,7 @@ func NewChainAnalyzer( return &ChainAnalyzer{ ctx: ctx, cancel: cancel, - }, errors.Wrap(err, "unable to read metric.") + }, errors.Wrap(err, "unable to read metric") } idbClient, err := db.New(ctx, iConfig.DBUrl) @@ -88,7 +88,7 @@ func NewChainAnalyzer( return &ChainAnalyzer{ ctx: ctx, cancel: cancel, - }, errors.Wrap(err, "unable to generate DB Client.") + }, errors.Wrap(err, "unable to generate DB Client") } err = idbClient.Connect() @@ -96,7 +96,15 @@ func NewChainAnalyzer( return &ChainAnalyzer{ ctx: ctx, cancel: cancel, - }, errors.Wrap(err, "unable to connect DB Client.") + }, errors.Wrap(err, "unable to connect DB Client") + } + + err = idbClient.Migrate() + if err != nil { + return &ChainAnalyzer{ + ctx: ctx, + cancel: cancel, + }, errors.Wrap(err, "unable to perform DB migrations") } // generate the httpAPI client @@ -109,7 +117,7 @@ func NewChainAnalyzer( return &ChainAnalyzer{ ctx: ctx, cancel: cancel, - }, errors.Wrap(err, "unable to generate API Client.") + }, errors.Wrap(err, "unable to generate API Client") } genesisTime := cli.RequestGenesis() @@ -120,7 +128,7 @@ func NewChainAnalyzer( return &ChainAnalyzer{ ctx: ctx, cancel: cancel, - }, errors.Wrap(err, "unable to generate API Client.") + }, errors.Wrap(err, "unable to generate API Client") } idbClient.InitGenesis(genesisTime) diff --git a/pkg/db/high_level_driver.go b/pkg/db/high_level_driver.go index 79c76a0..43182bc 100644 --- a/pkg/db/high_level_driver.go +++ b/pkg/db/high_level_driver.go @@ -19,8 +19,15 @@ func (s *DBService) ConnectHighLevel() error { if err != nil { return err } + s.highLevelClient = conn - return conn.Ping(context.Background()) + erro := conn.Ping(context.Background()) + if erro != nil { + return erro + } + + log.Info("high level client is connected") + return nil } diff --git a/pkg/db/low_level_driver.go b/pkg/db/low_level_driver.go index ec16f36..fbfdcce 100644 --- a/pkg/db/low_level_driver.go +++ b/pkg/db/low_level_driver.go @@ -3,6 +3,7 @@ package db import ( "context" "crypto/tls" + "fmt" "strings" "time" @@ -10,21 +11,31 @@ import ( "github.com/ClickHouse/ch-go/proto" ) -func (s *DBService) ConnectLowLevel() error { +func (s *DBService) ConnectLowLevel(retry bool) error { ctx := context.Background() - opts := ParseChUrlIntoOptionsLowLevel(s.connectionUrl) + opts, migrationUrl := ParseChUrlIntoOptionsLowLevel(s.connectionUrl) + lowLevelConn, err := ch.Dial(ctx, opts) if err == nil { s.lowLevelClient = lowLevelConn - err = s.makeMigrations() + s.migrationUrl = migrationUrl + log.Info("low level client is connected") + // err = s.makeMigrations(migrationUrl) + } else { + if retry { + // database may be in idle state, wait 30 seconds and retry another time + log.Warning("database could be idle, service will retry to connect another time in 30 seconds ...") + time.Sleep(30 * time.Second) + return s.ConnectLowLevel(false) + } } return err } -func ParseChUrlIntoOptionsLowLevel(url string) ch.Options { +func ParseChUrlIntoOptionsLowLevel(url string) (ch.Options, string) { var user string var password string var database string @@ -48,6 +59,8 @@ func ParseChUrlIntoOptionsLowLevel(url string) ch.Options { user = strings.Split(credentials, ":")[0] password = strings.Split(credentials, ":")[1] + log.Print(fqdn) + options := ch.Options{ Address: fqdn, Database: database, @@ -55,11 +68,13 @@ func ParseChUrlIntoOptionsLowLevel(url string) ch.Options { Password: password, } + migrationDatabaseUrl := fmt.Sprintf("%s?x-multi-statement=true", url) if strings.Contains(fqdn, "clickhouse.cloud") { options.TLS = &tls.Config{} + migrationDatabaseUrl = fmt.Sprintf("%s?x-multi-statement=true&x-migrations-table-engine=MergeTree&secure=true", url) } - return options + return options, migrationDatabaseUrl } func (p *DBService) Persist( diff --git a/pkg/db/migrate.go b/pkg/db/migrate.go index 36302c4..dab21b0 100644 --- a/pkg/db/migrate.go +++ b/pkg/db/migrate.go @@ -8,30 +8,35 @@ import ( ) func (s *DBService) makeMigrations() error { + log.Info("will try to apply migrations ...") m, err := migrate.New( "file://pkg/db/migrations", - s.connectionUrl) + s.migrationUrl) + if err != nil { - log.Errorf(err.Error()) + log.Errorf("could not create DB migrator: %s", err.Error()) return err } - log.Infof("applying database migrations...") + + log.Infof("now applying database migrations ...") if err := m.Up(); err != nil { if err != migrate.ErrNoChange { - log.Errorf(err.Error()) + log.Errorf("there was an error while applying migrations: %s", err.Error()) return err } } connErr, dbErr := m.Close() if connErr != nil { - log.Errorf(connErr.Error()) + log.Errorf("there was an error closing migrator connection: %s", connErr.Error()) return connErr } + if dbErr != nil { - log.Errorf(dbErr.Error()) + log.Errorf("there was an error with DB migrator: %s", dbErr.Error()) return dbErr } + return err } diff --git a/pkg/db/service.go b/pkg/db/service.go index 0565e16..cb2a067 100644 --- a/pkg/db/service.go +++ b/pkg/db/service.go @@ -32,6 +32,8 @@ type DBService struct { ctx context.Context connectionUrl string // the url might not be necessary (better to remove it?¿) + migrationUrl string + lowLevelClient *ch.Client // for bulk loads, mainly insert highLevelClient driver.Conn // for side tasks, like Select and Delete @@ -63,18 +65,27 @@ func New(ctx context.Context, url string, options ...DBServiceOption) (*DBServic func (s *DBService) Connect() error { - err := s.ConnectHighLevel() + err := s.ConnectLowLevel(true) if err != nil { - return fmt.Errorf("high level db driver error: %s", err) + return fmt.Errorf("low level db driver error: %s", err) } - err = s.ConnectLowLevel() + err = s.ConnectHighLevel() if err != nil { - return fmt.Errorf("low level db driver error: %s", err) + return fmt.Errorf("high level db driver error: %s", err) } return nil +} +func (s *DBService) Migrate() error { + + err := s.makeMigrations() + if err != nil { + return fmt.Errorf("migration error: %s", err) + } + + return nil } func WithUrl(url string) DBServiceOption {