-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
128 lines (102 loc) · 3.26 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/go-redis/redis/v8"
"github.com/schollz/progressbar/v3"
)
func main() {
var oldRedisURL, newRedisURL string
flag.StringVar(&oldRedisURL, "old", "", "Old Redis connection string (e.g., '[user]:[pass]@url:port')")
flag.StringVar(&newRedisURL, "new", "", "New Redis connection string (e.g., '[user]:[pass]@url:port')")
flag.Parse()
if oldRedisURL == "" || newRedisURL == "" {
fmt.Println("Please provide the old and new Redis connection strings using the -old and -new flags.")
flag.Usage()
os.Exit(1)
}
log.Println("Starting the Redis data transfer application...")
oldClient := redis.NewClient(&redis.Options{
Addr: oldRedisURL,
DB: 0,
ReadTimeout: 10 * time.Minute,
WriteTimeout: 10 * time.Minute,
})
newClient := redis.NewClient(&redis.Options{
Addr: newRedisURL,
DB: 0,
ReadTimeout: 10 * time.Minute, // this will define the WriteTimeout too
WriteTimeout: 10 * time.Minute,
})
oldKeysCount, err := oldClient.DBSize(context.Background()).Result()
if err != nil {
log.Fatalf("Error getting the count of keys from the old Redis database: %v", err)
}
log.Printf("Number of keys in the old Redis database (before transfer): %d", oldKeysCount)
_ = newClient.FlushAll(context.Background())
keys, err := oldClient.Keys(context.Background(), "*").Result()
if err != nil {
log.Fatalf("Error getting keys: %v", err)
}
bar := progressbar.Default(int64(len(keys)))
newPipeline := newClient.Pipeline()
keysChan := make(chan string, len(keys))
var wg sync.WaitGroup
numWorkers := 100
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for key := range keysChan {
if key == "" {
return
}
exists, err := oldClient.Exists(context.Background(), key).Result()
if err != nil {
log.Printf("Error checking existence of key %s: %v\n", key, err)
continue
}
if exists == 1 { // 1 exist
dumpData, err := oldClient.Dump(context.Background(), key).Result()
if err != nil {
log.Printf("Error getting dump data for key %s: %v\n", key, err)
continue
}
ttl, err := oldClient.TTL(context.Background(), key).Result()
if err != nil {
log.Printf("Error getting TTL for key %s: %v\n", key, err)
}
newPipeline.RestoreReplace(context.Background(), key, ttl, dumpData)
} else { // 0 not exist
log.Printf("Key doesn't exist with value, probably it's a (nil) value and type = none, %s\n", key)
}
bar.Add(1)
}
}()
}
for _, key := range keys {
keysChan <- key
}
close(keysChan)
wg.Wait()
_, err = newPipeline.Exec(context.Background())
if err != nil {
log.Fatalf("Error executing the new Redis pipeline: %v", err)
}
newKeysCount, err := newClient.DBSize(context.Background()).Result()
if err != nil {
log.Fatalf("Error getting the count of keys from the new Redis database: %v", err)
}
log.Println("Data transfer completed successfully.")
log.Printf("Number of keys in the old Redis database: %d", oldKeysCount)
log.Printf("Number of keys in the new Redis database: %d", newKeysCount)
if oldKeysCount != newKeysCount {
log.Printf("Exiting job with error, oldKeyCount != newKeyscount")
os.Exit(1)
}
}