Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pass proper context, track errors on client creations #11

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions replicator/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
replicator
.idea
117 changes: 67 additions & 50 deletions replicator/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"encoding/hex"
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"regexp"
"sort"
"syscall"
"time"

"github.com/codenotary/immudb/pkg/api/schema"
Expand All @@ -40,6 +43,7 @@ var (
Buildtime = "00"
Commit = "00"
AESKey = "NOKEY"
signals = make(chan os.Signal, 1)
)

var config struct {
Expand Down Expand Up @@ -117,6 +121,9 @@ func init() {
if s, err := aesdecrypt(config.FollowerPassword); err == nil {
config.FollowerPassword = s
}

// Signal
signal.Notify(signals, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
}

func aesdecrypt(s string) (string, error) {
Expand Down Expand Up @@ -156,23 +163,17 @@ func sleep(t0 int64) {
time.Sleep(time.Duration(sleepTime) * time.Second)
}

func connect(addr string, port int, username, password string) (context.Context, immuclient.ImmuClient) {
ctx := context.Background()
func connect(ctx context.Context, addr string, port int, username, password string) (immuclient.ImmuClient, error) {
opts := immuclient.DefaultOptions().WithAddress(addr).WithPort(port)

client, err := immuclient.NewImmuClient(opts)
if err != nil {
log.Printf("Failed to connect to %s:%d. Reason: %s", addr, port, err.Error())
return ctx, nil
}
client := immuclient.NewClient()
client.Options = opts

login, err := client.Login(ctx, []byte(username), []byte(password))
if err != nil {
log.Printf("Failed to login to %s:%d. Reason: %s", addr, port, err.Error())
return ctx, nil
if err := client.OpenSession(ctx, []byte(username), []byte(password), addr); err != nil {
return client, fmt.Errorf("unable to open session, error %w", err)
}
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("authorization", login.GetToken()))
return ctx, client

return client, nil
}

func db_list(ctx context.Context, client immuclient.ImmuClient) []string {
Expand All @@ -192,14 +193,14 @@ const (
OP_DEL
)

type repl_operation struct {
type replOperation struct {
op int
src_db string
dst_db string
}

func process_db_list(master_list, replica_list []string) []repl_operation {
var oplist []repl_operation
func processDbList(master_list, replica_list []string) []replOperation {
var oplist []replOperation
sort.Strings(master_list)
sort.Strings(replica_list)
master_has_defaultdb := false
Expand All @@ -225,7 +226,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation {
}
}
if master_has_defaultdb && !replica_has_defaultdb_bak {
oplist = append(oplist, repl_operation{
oplist = append(oplist, replOperation{
op: OP_ADD,
src_db: "defaultdb",
dst_db: "defaultdbbak",
Expand All @@ -240,15 +241,15 @@ func process_db_list(master_list, replica_list []string) []repl_operation {
ri += 1
} else if master_list[mi] < replica_list[ri] {
log.Printf("Missing %s, need replication", master_list[mi])
oplist = append(oplist, repl_operation{
oplist = append(oplist, replOperation{
op: OP_ADD,
src_db: master_list[mi],
dst_db: master_list[mi],
})
mi += 1
} else {
log.Printf("Deleted %s, stop replication", replica_list[ri])
oplist = append(oplist, repl_operation{
oplist = append(oplist, replOperation{
op: OP_DEL,
src_db: replica_list[ri],
dst_db: replica_list[ri],
Expand All @@ -258,7 +259,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation {
}
for mi < len(master_list) {
log.Printf("Missing %s, need replication", master_list[mi])
oplist = append(oplist, repl_operation{
oplist = append(oplist, replOperation{
op: OP_ADD,
src_db: master_list[mi],
dst_db: master_list[mi],
Expand All @@ -267,7 +268,7 @@ func process_db_list(master_list, replica_list []string) []repl_operation {
}
for ri < len(replica_list) {
log.Printf("Deleted %s, stop replication", replica_list[ri])
oplist = append(oplist, repl_operation{
oplist = append(oplist, replOperation{
op: OP_DEL,
src_db: replica_list[ri],
dst_db: replica_list[ri],
Expand All @@ -277,22 +278,22 @@ func process_db_list(master_list, replica_list []string) []repl_operation {
return oplist
}

func check_user_exists(ctx context.Context, m_cli immuclient.ImmuClient, user string) bool {
userlist, err := m_cli.ListUsers(ctx)
func checkUserExists(ctx context.Context, m_cli immuclient.ImmuClient, user string) bool {
u, err := m_cli.ListUsers(ctx)
if err != nil {
log.Printf("Failed to get user list. Reason: %s", err.Error())
return false
}

for _, u := range userlist.Users {
for _, u := range u.Users {
if string(u.User) == user {
return true
}
}
return false
}

func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx context.Context, r_cli immuclient.ImmuClient, src_db, dst_db string) error {
func configReplica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx context.Context, r_cli immuclient.ImmuClient, src_db, dst_db string) error {
log.Printf("CONFIG_REPLICA: %s:%s -> %s:%s", config.MasterAddr, src_db, config.ReplicaAddr, dst_db)
log.Printf("Fetching settings from master database %s", src_db)
udr, err := m_cli.UseDatabase(m_ctx, &schema.Database{DatabaseName: src_db})
Expand All @@ -306,7 +307,7 @@ func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx co
log.Printf("Can't fetch setting from master database %s: %s", src_db, err.Error())
return err
}
if check_user_exists(ctx, m_cli, config.FollowerUsername) {
if checkUserExists(ctx, m_cli, config.FollowerUsername) {
log.Printf("User already exists on master database %s", src_db)
err = m_cli.ChangePermission(ctx, schema.PermissionAction_GRANT, config.FollowerUsername, src_db, auth.PermissionAdmin)
if err != nil {
Expand Down Expand Up @@ -365,48 +366,64 @@ func config_replica(m_ctx context.Context, m_cli immuclient.ImmuClient, r_ctx co
return nil
}

func analyze_db() {
master_ctx, master_cli := connect(config.MasterAddr, config.MasterPort, config.MasterUsername, config.MasterPassword)
if master_cli == nil {
return
func analyze_db(ctx context.Context) error {
tctx, cancel := context.WithTimeout(ctx, time.Second)
masterClient, err := connect(tctx, config.MasterAddr, config.MasterPort, config.MasterUsername, config.MasterPassword)
if err != nil {
cancel()
return fmt.Errorf("unable to connect to Master Addr %s Port %d error %w", config.MasterAddr, config.Port, err)
}
defer func() { _ = master_cli.Disconnect() }()
replica_ctx, replica_cli := connect(config.ReplicaAddr, config.ReplicaPort, config.ReplicaUsername, config.ReplicaPassword)
if replica_cli == nil {
return

defer func() { _ = masterClient.CloseSession(ctx) }()

replicaClient, err := connect(tctx, config.ReplicaAddr, config.ReplicaPort, config.ReplicaUsername, config.ReplicaPassword)
if err != nil {
cancel()
return fmt.Errorf("unable to connect to Replica Addr %s Port %d error %w", config.ReplicaAddr, config.ReplicaPort, err)
}
defer func() { _ = replica_cli.Disconnect() }()
defer func() { _ = replicaClient.CloseSession(ctx) }()
cancel()

master_list := db_list(master_ctx, master_cli)
replica_list := db_list(replica_ctx, replica_cli)
oplist := process_db_list(master_list, replica_list)
for _, o := range oplist {
masterDBs := db_list(ctx, masterClient)
replicaDBs := db_list(ctx, replicaClient)
ops := processDbList(masterDBs, replicaDBs)
for _, o := range ops {
log.Printf("- %v", o)
switch o.op {
case OP_ADD:
_ = config_replica(master_ctx, master_cli, replica_ctx, replica_cli, o.src_db, o.dst_db)
_ = configReplica(ctx, masterClient, ctx, replicaClient, o.src_db, o.dst_db)
case OP_DEL:
log.Printf("Ignoring orphaned database %s", o.dst_db)
}
}

return nil
}

func replicator_loop() {
t0 := time.Now().Unix()
func replicatorLoop(ctx context.Context) {
t := time.NewTicker(time.Second * 60)

for {
if BackupInfo.is_running() {
log.Printf("Backup in progress, skipping analysis")
} else {
log.Printf("Analyzing")
analyze_db()
select {
case <-t.C:
if BackupInfo.is_running() {
continue
}
if err := analyze_db(ctx); err != nil {
log.Printf("unexpected error analyzing DBs %v", err)
}

case <-ctx.Done():
return
}
log.Printf("Sleeping")
sleep(t0)
}
}

func main() {
log.Printf("Replicator %s [%s] @ %s", Version, Commit, Buildtime)
ctx, cancel := context.WithCancel(context.Background())
go rest_interface()
replicator_loop()
go replicatorLoop(ctx)
<-signals
cancel()
}