From 4f24c40715042f0c16002f9e7013b0d9c92392a0 Mon Sep 17 00:00:00 2001 From: marcosQuesada Date: Fri, 14 Jul 2023 12:28:29 +0200 Subject: [PATCH] pass proper context, track errors on client creations --- replicator/.gitignore | 1 + replicator/replicator.go | 117 ++++++++++++++++++++++----------------- 2 files changed, 68 insertions(+), 50 deletions(-) diff --git a/replicator/.gitignore b/replicator/.gitignore index a22508d..2754268 100644 --- a/replicator/.gitignore +++ b/replicator/.gitignore @@ -1 +1,2 @@ replicator +.idea diff --git a/replicator/replicator.go b/replicator/replicator.go index 6e7d97d..4ae30e8 100644 --- a/replicator/replicator.go +++ b/replicator/replicator.go @@ -22,10 +22,13 @@ import ( "encoding/hex" "errors" "flag" + "fmt" "log" "os" + "os/signal" "regexp" "sort" + "syscall" "time" "github.com/codenotary/immudb/pkg/api/schema" @@ -40,6 +43,7 @@ var ( Buildtime = "00" Commit = "00" AESKey = "NOKEY" + signals = make(chan os.Signal, 1) ) var config struct { @@ -117,6 +121,9 @@ func init() { if s, err := aesdecrypt(config.FollowerPassword); err == nil { config.FollowerPassword = s } + + // Signal + signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) } func aesdecrypt(s string) (string, error) { @@ -156,23 +163,17 @@ func sleep(t0 int64) { time.Sleep(time.Duration(sleepTime) * time.Second) } -func connect(addr string, port int, username, password string) (context.Context, immuclient.ImmuClient) { - ctx := context.Background() +func connect(ctx context.Context, addr string, port int, username, password string) (immuclient.ImmuClient, error) { opts := immuclient.DefaultOptions().WithAddress(addr).WithPort(port) - client, err := immuclient.NewImmuClient(opts) - if err != nil { - log.Printf("Failed to connect to %s:%d. Reason: %s", addr, port, err.Error()) - return ctx, nil - } + client := immuclient.NewClient() + client.Options = opts - login, err := client.Login(ctx, []byte(username), []byte(password)) - if err != nil { - log.Printf("Failed to login to %s:%d. Reason: %s", addr, port, err.Error()) - return ctx, nil + if err := client.OpenSession(ctx, []byte(username), []byte(password), addr); err != nil { + return client, fmt.Errorf("unable to open session, error %w", err) } - ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("authorization", login.GetToken())) - return ctx, client + + return client, nil } func db_list(ctx context.Context, client immuclient.ImmuClient) []string { @@ -192,14 +193,14 @@ const ( OP_DEL ) -type repl_operation struct { +type replOperation struct { op int src_db string dst_db string } -func process_db_list(master_list, replica_list []string) []repl_operation { - var oplist []repl_operation +func processDbList(master_list, replica_list []string) []replOperation { + var oplist []replOperation sort.Strings(master_list) sort.Strings(replica_list) master_has_defaultdb := false @@ -225,7 +226,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation { } } if master_has_defaultdb && !replica_has_defaultdb_bak { - oplist = append(oplist, repl_operation{ + oplist = append(oplist, replOperation{ op: OP_ADD, src_db: "defaultdb", dst_db: "defaultdbbak", @@ -240,7 +241,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation { ri += 1 } else if master_list[mi] < replica_list[ri] { log.Printf("Missing %s, need replication", master_list[mi]) - oplist = append(oplist, repl_operation{ + oplist = append(oplist, replOperation{ op: OP_ADD, src_db: master_list[mi], dst_db: master_list[mi], @@ -248,7 +249,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation { mi += 1 } else { log.Printf("Deleted %s, stop replication", replica_list[ri]) - oplist = append(oplist, repl_operation{ + oplist = append(oplist, replOperation{ op: OP_DEL, src_db: replica_list[ri], dst_db: replica_list[ri], @@ -258,7 +259,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation { } for mi < len(master_list) { log.Printf("Missing %s, need replication", master_list[mi]) - oplist = append(oplist, repl_operation{ + oplist = append(oplist, replOperation{ op: OP_ADD, src_db: master_list[mi], dst_db: master_list[mi], @@ -267,7 +268,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation { } for ri < len(replica_list) { log.Printf("Deleted %s, stop replication", replica_list[ri]) - oplist = append(oplist, repl_operation{ + oplist = append(oplist, replOperation{ op: OP_DEL, src_db: replica_list[ri], dst_db: replica_list[ri], @@ -277,14 +278,14 @@ func process_db_list(master_list, replica_list []string) []repl_operation { return oplist } -func check_user_exists(ctx context.Context, m_cli immuclient.ImmuClient, user string) bool { - userlist, err := m_cli.ListUsers(ctx) +func checkUserExists(ctx context.Context, m_cli immuclient.ImmuClient, user string) bool { + u, err := m_cli.ListUsers(ctx) if err != nil { log.Printf("Failed to get user list. Reason: %s", err.Error()) return false } - for _, u := range userlist.Users { + for _, u := range u.Users { if string(u.User) == user { return true } @@ -292,7 +293,7 @@ func check_user_exists(ctx context.Context, m_cli immuclient.ImmuClient, user st return false } -func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx context.Context, r_cli immuclient.ImmuClient, src_db, dst_db string) error { +func configReplica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx context.Context, r_cli immuclient.ImmuClient, src_db, dst_db string) error { log.Printf("CONFIG_REPLICA: %s:%s -> %s:%s", config.MasterAddr, src_db, config.ReplicaAddr, dst_db) log.Printf("Fetching settings from master database %s", src_db) udr, err := m_cli.UseDatabase(m_ctx, &schema.Database{DatabaseName: src_db}) @@ -306,7 +307,7 @@ func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx co log.Printf("Can't fetch setting from master database %s: %s", src_db, err.Error()) return err } - if check_user_exists(ctx, m_cli, config.FollowerUsername) { + if checkUserExists(ctx, m_cli, config.FollowerUsername) { log.Printf("User already exists on master database %s", src_db) err = m_cli.ChangePermission(ctx, schema.PermissionAction_GRANT, config.FollowerUsername, src_db, auth.PermissionAdmin) if err != nil { @@ -365,48 +366,64 @@ func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx co return nil } -func analyze_db() { - master_ctx, master_cli := connect(config.MasterAddr, config.MasterPort, config.MasterUsername, config.MasterPassword) - if master_cli == nil { - return +func analyze_db(ctx context.Context) error { + tctx, cancel := context.WithTimeout(ctx, time.Second) + masterClient, err := connect(tctx, config.MasterAddr, config.MasterPort, config.MasterUsername, config.MasterPassword) + if err != nil { + cancel() + return fmt.Errorf("unable to connect to Master Addr %s Port %d error %w", config.MasterAddr, config.Port, err) } - defer func() { _ = master_cli.Disconnect() }() - replica_ctx, replica_cli := connect(config.ReplicaAddr, config.ReplicaPort, config.ReplicaUsername, config.ReplicaPassword) - if replica_cli == nil { - return + + defer func() { _ = masterClient.CloseSession(ctx) }() + + replicaClient, err := connect(tctx, config.ReplicaAddr, config.ReplicaPort, config.ReplicaUsername, config.ReplicaPassword) + if err != nil { + cancel() + return fmt.Errorf("unable to connect to Replica Addr %s Port %d error %w", config.ReplicaAddr, config.ReplicaPort, err) } - defer func() { _ = replica_cli.Disconnect() }() + defer func() { _ = replicaClient.CloseSession(ctx) }() + cancel() - master_list := db_list(master_ctx, master_cli) - replica_list := db_list(replica_ctx, replica_cli) - oplist := process_db_list(master_list, replica_list) - for _, o := range oplist { + masterDBs := db_list(ctx, masterClient) + replicaDBs := db_list(ctx, replicaClient) + ops := processDbList(masterDBs, replicaDBs) + for _, o := range ops { log.Printf("- %v", o) switch o.op { case OP_ADD: - _ = config_replica(master_ctx, master_cli, replica_ctx, replica_cli, o.src_db, o.dst_db) + _ = configReplica(ctx, masterClient, ctx, replicaClient, o.src_db, o.dst_db) case OP_DEL: log.Printf("Ignoring orphaned database %s", o.dst_db) } } + + return nil } -func replicator_loop() { - t0 := time.Now().Unix() +func replicatorLoop(ctx context.Context) { + t := time.NewTicker(time.Second * 60) + for { - if BackupInfo.is_running() { - log.Printf("Backup in progress, skipping analysis") - } else { - log.Printf("Analyzing") - analyze_db() + select { + case <-t.C: + if BackupInfo.is_running() { + continue + } + if err := analyze_db(ctx); err != nil { + log.Printf("unexpected error analyzing DBs %v", err) + } + + case <-ctx.Done(): + return } - log.Printf("Sleeping") - sleep(t0) } } func main() { log.Printf("Replicator %s [%s] @ %s", Version, Commit, Buildtime) + ctx, cancel := context.WithCancel(context.Background()) go rest_interface() - replicator_loop() + go replicatorLoop(ctx) + <-signals + cancel() }