Skip to content

Commit

Permalink
enhance: add orphan-checkpoints flag in remove channel
Browse files Browse the repository at this point in the history
Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Aug 23, 2024
1 parent 6692403 commit ea3af77
Showing 1 changed file with 45 additions and 32 deletions.
77 changes: 45 additions & 32 deletions states/etcd/remove/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd := &cobra.Command{
Use: "channel",
Short: "Remove channel from datacoord meta with specified condition if orphan",
Short: "Remove channel watchinfo or checkpoint from datacoord meta with specified condition if orphan",
Run: func(cmd *cobra.Command, args []string) {
channelName, err := cmd.Flags().GetString("channel")
if err != nil {
Expand All @@ -35,6 +35,16 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
fmt.Println(err.Error())
return
}
removeCp, err := cmd.Flags().GetBool("orphan-checkpoints")
if err != nil {
fmt.Println(err.Error())
return
}

if force && removeCp && len(channelName) == 0 {
fmt.Println("Attmept to force remove all checkpoints, ignore it...")
return
}

collections, err := common.ListCollectionsVersion(context.Background(), cli, basePath, etcdversion.GetVersion())
if err != nil {
Expand All @@ -49,42 +59,44 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
}
}

watchChannels, paths, err := common.ListChannelWatchV2(cli, basePath, func(info *datapbv2.ChannelWatchInfo) bool {
if len(channelName) > 0 {
return info.GetVchan().GetChannelName() == channelName
targets := make([]string, 0)
if removeCp {
allCps, cpPaths, err := common.ListChannelCheckpint(cli, basePath, func(pos *internalpb.MsgPosition) bool {
if len(channelName) > 0 {
return pos.GetChannelName() == channelName
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}

orphanCps, cpPaths, err := common.ListChannelCheckpint(cli, basePath, func(pos *internalpb.MsgPosition) bool {
if len(channelName) > 0 {
return pos.GetChannelName() == channelName
for i, orphanCp := range allCps {
_, ok := validChannels[orphanCp.GetChannelName()]
if !ok || force {
fmt.Printf("%s selected as target orpah checkpoint, path: %s\n", orphanCp.GetChannelName(), cpPaths[i])
targets = append(targets, cpPaths[i])
}
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}

targets := make([]string, 0, len(paths)+len(cpPaths))
for i, watchChannel := range watchChannels {
_, ok := validChannels[watchChannel.GetVchan().GetChannelName()]
if !ok || force {
fmt.Printf("%s selected as target channel, collection id: %d\n", watchChannel.GetVchan().GetChannelName(), watchChannel.GetVchan().GetCollectionID())
targets = append(targets, paths[i])
} else {
watchChannels, paths, err := common.ListChannelWatchV2(cli, basePath, func(info *datapbv2.ChannelWatchInfo) bool {
if len(channelName) > 0 {
return info.GetVchan().GetChannelName() == channelName
}
return true
})
if err != nil {
fmt.Println(err.Error())
return
}
}

for i, orphanCp := range orphanCps {
_, ok := validChannels[orphanCp.GetChannelName()]
if !ok || force {
fmt.Printf("%s selected as target orpah checkpoint\n", orphanCp.GetChannelName())
targets = append(targets, cpPaths[i])
for i, watchChannel := range watchChannels {
_, ok := validChannels[watchChannel.GetVchan().GetChannelName()]
if !ok || force {
fmt.Printf("%s selected as target channel watchinfo, collection id: %d\n", watchChannel.GetVchan().GetChannelName(), watchChannel.GetVchan().GetCollectionID())
targets = append(targets, paths[i])
}
}
}

Expand All @@ -108,5 +120,6 @@ func ChannelCommand(cli clientv3.KV, basePath string) *cobra.Command {
cmd.Flags().Bool("run", false, "flags indicating whether to remove channel from meta")
cmd.Flags().String("channel", "", "channel name to remove")
cmd.Flags().Bool("force", false, "force remove channel ignoring collection check")
cmd.Flags().Bool("orphan-checkpoints", false, "Whether to remove leaking channel checkpoints, cannot be used with --force when no channel is specified. If no channel is specified, all orphan checkpoints will be removed")
return cmd
}

0 comments on commit ea3af77

Please sign in to comment.