diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index b9211de0..8d7cfb7c 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -6,6 +6,7 @@ import ( "RedisShake/internal/config" "RedisShake/internal/entry" "RedisShake/internal/log" + "RedisShake/internal/rdb/types" "RedisShake/internal/utils" "fmt" "math/bits" @@ -165,11 +166,31 @@ func (r *scanStandaloneReader) fetch() { pttl = 0 // -1 means no expire } if uint64(len(dump)) > config.Opt.Advanced.TargetRedisProtoMaxBulkLen { - log.Panicf("not support large key. key=[%s], len=[%d]", key, len(dump)) + log.Warnf("key=[%s] dump len=[%d] too large, split it. This is not a good practice in Redis.", key, len(dump)) + typeByte := dump[0] + anotherReader := strings.NewReader(dump[1 : len(dump)-10]) + o := types.ParseObject(anotherReader, typeByte, key) + cmds := o.Rewrite() + for _, cmd := range cmds { + e := entry.NewEntry() + e.DbId = dbId + e.Argv = cmd + r.ch <- e + } + if pttl != 0 { + e := entry.NewEntry() + e.DbId = dbId + e.Argv = []string{"PEXPIRE", key, strconv.Itoa(pttl)} + r.ch <- e + } } else { + argv := []string{"RESTORE", key, strconv.Itoa(pttl), dump} + if config.Opt.Advanced.RDBRestoreCommandBehavior == "rewrite" { + argv = append(argv, "replace") + } r.ch <- &entry.Entry{ DbId: dbId, - Argv: []string{"RESTORE", key, strconv.Itoa(pttl), dump}, + Argv: argv, } } }