From 2162ce34ce7e96a956686175a57d14f9e5deff48 Mon Sep 17 00:00:00 2001 From: ItsFunny <39111451+ItsFunny@users.noreply.github.com> Date: Sun, 30 Jan 2022 15:42:00 +0800 Subject: [PATCH] Merge PR: deduplicate (#1514) add test case Co-authored-by: KamiD <44460798+KamiD@users.noreply.github.com> --- x/evm/watcher/watcher.go | 126 +++++++++++++++++++++++++++++++++- x/evm/watcher/watcher_test.go | 86 +++++++++++++++++++++++ 2 files changed, 211 insertions(+), 1 deletion(-) diff --git a/x/evm/watcher/watcher.go b/x/evm/watcher/watcher.go index 5db647fad2..e0e2788ee5 100644 --- a/x/evm/watcher/watcher.go +++ b/x/evm/watcher/watcher.go @@ -377,6 +377,7 @@ func (w *Watcher) Commit() { batch := w.batch w.dispatchJob(func() { w.commitBatch(batch) }) + // we dont do deduplicatie here,we do it in `commit routine` // get centerBatch for sending to DataCenter ddsBatch := make([]*Batch, len(batch)) for i, b := range batch { @@ -412,7 +413,12 @@ func (w *Watcher) CommitWatchData(data WatchData) { } func (w *Watcher) commitBatch(batch []WatchMessage) { + filterMap := make(map[string]WatchMessage) for _, b := range batch { + filterMap[bytes2Key(b.GetKey())] = b + } + + for _, b := range filterMap { key := b.GetKey() value := []byte(b.GetValue()) typeValue := b.GetType() @@ -472,7 +478,8 @@ func (w *Watcher) GetWatchDataFunc() func() ([]byte, error) { value.DelayEraseKey = w.delayEraseKey return func() ([]byte, error) { - valueByte, err := value.MarshalToAmino(nil) + filterWatcher := filterCopy(value) + valueByte, err := filterWatcher.MarshalToAmino(nil) if err != nil { return nil, err } @@ -526,6 +533,123 @@ func (w *Watcher) CheckWatchDB(keys [][]byte, mode string) { w.log.Info("watchDB delta", "mode", mode, "height", w.height, "hash", hex.EncodeToString(kvHash.Sum(nil)), "kv", output) } +func bytes2Key(keyBytes []byte) string { + return string(keyBytes) +} + +func key2Bytes(key string) []byte { + return []byte(key) +} + +func filterCopy(origin *WatchData) *WatchData { + return &WatchData{ + DirtyAccount: filterAccount(origin.DirtyAccount), + Batches: filterBatch(origin.Batches), + DelayEraseKey: filterDelayEraseKey(origin.DelayEraseKey), + BloomData: filterBloomData(origin.BloomData), + DirtyList: filterDirtyList(origin.DirtyList), + } +} + +func filterAccount(accounts []*sdk.AccAddress) []*sdk.AccAddress { + if len(accounts) == 0 { + return nil + } + + filterAccountMap := make(map[string]*sdk.AccAddress) + for _, account := range accounts { + filterAccountMap[bytes2Key(account.Bytes())] = account + } + + ret := make([]*sdk.AccAddress, len(filterAccountMap)) + i := 0 + for _, acc := range filterAccountMap { + ret[i] = acc + i++ + } + + return ret +} + +func filterBatch(datas []*Batch) []*Batch { + if len(datas) == 0 { + return nil + } + + filterBatch := make(map[string]*Batch) + for _, b := range datas { + filterBatch[bytes2Key(b.Key)] = b + } + + ret := make([]*Batch, len(filterBatch)) + i := 0 + for _, b := range filterBatch { + ret[i] = b + i++ + } + + return ret +} + +func filterDelayEraseKey(datas [][]byte) [][]byte { + if len(datas) == 0 { + return nil + } + + filterDelayEraseKey := make(map[string][]byte, 0) + for _, b := range datas { + filterDelayEraseKey[bytes2Key(b)] = b + } + + ret := make([][]byte, len(filterDelayEraseKey)) + i := 0 + for _, k := range filterDelayEraseKey { + ret[i] = k + i++ + } + + return ret +} +func filterBloomData(datas []*evmtypes.KV) []*evmtypes.KV { + if len(datas) == 0 { + return nil + } + + filterBloomData := make(map[string]*evmtypes.KV, 0) + for _, k := range datas { + filterBloomData[bytes2Key(k.Key)] = k + } + + ret := make([]*evmtypes.KV, len(filterBloomData)) + i := 0 + for _, k := range filterBloomData { + ret[i] = k + i++ + } + + return ret +} + +func filterDirtyList(datas [][]byte) [][]byte { + if len(datas) == 0 { + return nil + } + + filterDirtyList := make(map[string][]byte, 0) + for _, k := range datas { + filterDirtyList[bytes2Key(k)] = k + } + + ret := make([][]byte, len(filterDirtyList)) + i := 0 + for _, k := range filterDirtyList { + ret[i] = k + i++ + } + + return ret +} + /////////// job func (w *Watcher) jobRoutine() { if !w.Enabled() { diff --git a/x/evm/watcher/watcher_test.go b/x/evm/watcher/watcher_test.go index 9885b7b183..e9ff46b3a4 100644 --- a/x/evm/watcher/watcher_test.go +++ b/x/evm/watcher/watcher_test.go @@ -1,7 +1,12 @@ package watcher_test import ( + "encoding/hex" + "fmt" + "github.com/okex/exchain/libs/cosmos-sdk/x/auth" + "github.com/okex/exchain/libs/tendermint/libs/log" "math/big" + "os" "strings" "testing" "time" @@ -308,3 +313,84 @@ func TestDeployAndCallContract(t *testing.T) { testWatchData(t, w) } + +type mockDuplicateAccount struct { + *auth.BaseAccount + Addr byte + Seq byte +} + +func (a *mockDuplicateAccount) GetAddress() sdk.AccAddress { + return []byte{a.Addr} +} + +func newMockAccount(byteAddr, seq byte) *mockDuplicateAccount { + ret := &mockDuplicateAccount{Addr: byteAddr, Seq: seq} + pubkey := secp256k1.GenPrivKey().PubKey() + addr := sdk.AccAddress(pubkey.Address()) + baseAcc := auth.NewBaseAccount(addr, nil, pubkey, 0, 0) + ret.BaseAccount = baseAcc + return ret +} + +func TestDuplicateAddress(t *testing.T) { + accAdds := make([]*sdk.AccAddress, 0) + for i := 0; i < 10; i++ { + adds := hex.EncodeToString([]byte(fmt.Sprintf("addr-%d", i))) + a, _ := sdk.AccAddressFromHex(adds) + accAdds = append(accAdds, &a) + } + adds := hex.EncodeToString([]byte(fmt.Sprintf("addr-%d", 1))) + a, _ := sdk.AccAddressFromHex(adds) + accAdds = append(accAdds, &a) + filterM := make(map[string]struct{}) + count := 0 + for _, add := range accAdds { + _, exist := filterM[string(add.Bytes())] + if exist { + count++ + continue + } + filterM[string(add.Bytes())] = struct{}{} + } + require.Equal(t, 1, count) +} + +func TestDuplicateWatchMessage(t *testing.T) { + w := setupTest() + a1 := newMockAccount(1, 1) + w.app.EvmKeeper.Watcher.SaveAccount(a1, true) + a2 := newMockAccount(1, 2) + w.app.EvmKeeper.Watcher.SaveAccount(a2, true) + w.app.EvmKeeper.Watcher.Commit() + time.Sleep(time.Second) + store := watcher.InstanceOfWatchStore() + pWd := getDBKV(store) + require.Equal(t, 1, len(pWd)) +} + +func TestWriteLatestMsg(t *testing.T) { + viper.Set(watcher.FlagFastQuery, true) + viper.Set(watcher.FlagDBBackend, "memdb") + w := watcher.NewWatcher(log.NewTMLogger(os.Stdout)) + w.SetWatchDataFunc() + + a1 := newMockAccount(1, 1) + a11 := newMockAccount(1, 2) + a111 := newMockAccount(1, 3) + w.SaveAccount(a1, true) + w.SaveAccount(a11, true) + w.SaveAccount(a111, true) + w.Commit() + time.Sleep(time.Second) + store := watcher.InstanceOfWatchStore() + pWd := getDBKV(store) + require.Equal(t, 1, len(pWd)) + + m := watcher.NewMsgAccount(a1) + v, err := store.Get(m.GetKey()) + require.NoError(t, err) + mm := make(map[string]interface{}) + json.Unmarshal(v, &mm) + require.Equal(t, 3, int(mm["Seq"].(float64))) +}