Skip to content

Commit

Permalink
make migration driver working with secure clickhouse instance
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardo-gnosis committed Sep 2, 2024
1 parent 4da1ec9 commit 2c9960d
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 21 deletions.
18 changes: 13 additions & 5 deletions pkg/analyzer/chain_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,31 @@ func NewChainAnalyzer(
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to read metric.")
}, errors.Wrap(err, "unable to read metric")
}

idbClient, err := db.New(ctx, iConfig.DBUrl)
if err != nil {
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to generate DB Client.")
}, errors.Wrap(err, "unable to generate DB Client")
}

err = idbClient.Connect()
if err != nil {
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to connect DB Client.")
}, errors.Wrap(err, "unable to connect DB Client")
}

err = idbClient.Migrate()
if err != nil {
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to perform DB migrations")
}

// generate the httpAPI client
Expand All @@ -109,7 +117,7 @@ func NewChainAnalyzer(
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to generate API Client.")
}, errors.Wrap(err, "unable to generate API Client")
}

genesisTime := cli.RequestGenesis()
Expand All @@ -120,7 +128,7 @@ func NewChainAnalyzer(
return &ChainAnalyzer{
ctx: ctx,
cancel: cancel,
}, errors.Wrap(err, "unable to generate API Client.")
}, errors.Wrap(err, "unable to generate API Client")
}

idbClient.InitGenesis(genesisTime)
Expand Down
9 changes: 8 additions & 1 deletion pkg/db/high_level_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ func (s *DBService) ConnectHighLevel() error {
if err != nil {
return err
}

s.highLevelClient = conn
return conn.Ping(context.Background())
erro := conn.Ping(context.Background())
if erro != nil {
return erro
}

log.Info("high level client is connected")
return nil

}

Expand Down
25 changes: 20 additions & 5 deletions pkg/db/low_level_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,39 @@ package db
import (
"context"
"crypto/tls"
"fmt"
"strings"
"time"

"github.com/ClickHouse/ch-go"
"github.com/ClickHouse/ch-go/proto"
)

func (s *DBService) ConnectLowLevel() error {
func (s *DBService) ConnectLowLevel(retry bool) error {
ctx := context.Background()

opts := ParseChUrlIntoOptionsLowLevel(s.connectionUrl)
opts, migrationUrl := ParseChUrlIntoOptionsLowLevel(s.connectionUrl)

lowLevelConn, err := ch.Dial(ctx, opts)
if err == nil {
s.lowLevelClient = lowLevelConn
err = s.makeMigrations()
s.migrationUrl = migrationUrl
log.Info("low level client is connected")
// err = s.makeMigrations(migrationUrl)
} else {
if retry {
// database may be in idle state, wait 30 seconds and retry another time
log.Warning("database could be idle, service will retry to connect another time in 30 seconds ...")
time.Sleep(30 * time.Second)
return s.ConnectLowLevel(false)
}
}

return err

}

func ParseChUrlIntoOptionsLowLevel(url string) ch.Options {
func ParseChUrlIntoOptionsLowLevel(url string) (ch.Options, string) {
var user string
var password string
var database string
Expand All @@ -48,18 +59,22 @@ func ParseChUrlIntoOptionsLowLevel(url string) ch.Options {
user = strings.Split(credentials, ":")[0]
password = strings.Split(credentials, ":")[1]

log.Print(fqdn)

options := ch.Options{
Address: fqdn,
Database: database,
User: user,
Password: password,
}

migrationDatabaseUrl := fmt.Sprintf("%s?x-multi-statement=true", url)
if strings.Contains(fqdn, "clickhouse.cloud") {
options.TLS = &tls.Config{}
migrationDatabaseUrl = fmt.Sprintf("%s?x-multi-statement=true&x-migrations-table-engine=MergeTree&secure=true", url)
}

return options
return options, migrationDatabaseUrl
}

func (p *DBService) Persist(
Expand Down
17 changes: 11 additions & 6 deletions pkg/db/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,35 @@ import (
)

func (s *DBService) makeMigrations() error {
log.Info("will try to apply migrations ...")

m, err := migrate.New(
"file://pkg/db/migrations",
s.connectionUrl)
s.migrationUrl)

if err != nil {
log.Errorf(err.Error())
log.Errorf("could not create DB migrator: %s", err.Error())
return err
}
log.Infof("applying database migrations...")

log.Infof("now applying database migrations ...")
if err := m.Up(); err != nil {
if err != migrate.ErrNoChange {
log.Errorf(err.Error())
log.Errorf("there was an error while applying migrations: %s", err.Error())
return err
}
}
connErr, dbErr := m.Close()

if connErr != nil {
log.Errorf(connErr.Error())
log.Errorf("there was an error closing migrator connection: %s", connErr.Error())
return connErr
}

if dbErr != nil {
log.Errorf(dbErr.Error())
log.Errorf("there was an error with DB migrator: %s", dbErr.Error())
return dbErr
}

return err
}
19 changes: 15 additions & 4 deletions pkg/db/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type DBService struct {
ctx context.Context
connectionUrl string // the url might not be necessary (better to remove it?¿)

migrationUrl string

lowLevelClient *ch.Client // for bulk loads, mainly insert
highLevelClient driver.Conn // for side tasks, like Select and Delete

Expand Down Expand Up @@ -63,18 +65,27 @@ func New(ctx context.Context, url string, options ...DBServiceOption) (*DBServic

func (s *DBService) Connect() error {

err := s.ConnectHighLevel()
err := s.ConnectLowLevel(true)
if err != nil {
return fmt.Errorf("high level db driver error: %s", err)
return fmt.Errorf("low level db driver error: %s", err)
}

err = s.ConnectLowLevel()
err = s.ConnectHighLevel()
if err != nil {
return fmt.Errorf("low level db driver error: %s", err)
return fmt.Errorf("high level db driver error: %s", err)
}

return nil
}

func (s *DBService) Migrate() error {

err := s.makeMigrations()
if err != nil {
return fmt.Errorf("migration error: %s", err)
}

return nil
}

func WithUrl(url string) DBServiceOption {
Expand Down

0 comments on commit 2c9960d

Please sign in to comment.