From 903f7c3e9b62ac2b46ee7fb7eaade6bc92b94a14 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 11 Jun 2024 13:43:14 -0400 Subject: [PATCH 01/16] migrate x/sync to p2p Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/p2ptest/client.go | 24 ++--- network/p2p/p2ptest/client_test.go | 4 +- x/sync/client_test.go | 19 +--- x/sync/manager.go | 23 ++--- x/sync/network_server.go | 75 +++++++------- x/sync/network_server_test.go | 21 ++-- x/sync/sync_test.go | 152 +++++++++++++++-------------- 7 files changed, 155 insertions(+), 163 deletions(-) diff --git a/network/p2p/p2ptest/client.go b/network/p2p/p2ptest/client.go index b75654028666..747904b40ffb 100644 --- a/network/p2p/p2ptest/client.go +++ b/network/p2p/p2ptest/client.go @@ -21,25 +21,19 @@ import ( // NewClient generates a client-server pair and returns the client used to // communicate with a server with the specified handler -func NewClient( - t *testing.T, - ctx context.Context, - handler p2p.Handler, - clientNodeID ids.NodeID, - serverNodeID ids.NodeID, -) *p2p.Client { +func NewClient(t *testing.T, rootCtx context.Context, handler p2p.Handler) *p2p.Client { clientSender := &enginetest.Sender{} serverSender := &enginetest.Sender{} + clientNodeID := ids.GenerateTestNodeID() clientNetwork, err := p2p.NewNetwork(logging.NoLog{}, clientSender, prometheus.NewRegistry(), "") require.NoError(t, err) + serverNodeID := ids.GenerateTestNodeID() serverNetwork, err := p2p.NewNetwork(logging.NoLog{}, serverSender, prometheus.NewRegistry(), "") require.NoError(t, err) clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes)) }() @@ -58,8 +52,6 @@ func NewClient( } serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes)) }() @@ -68,8 +60,6 @@ func NewClient( } serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error { - // Send the request asynchronously to avoid deadlock when the server - // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{ Code: errorCode, @@ -80,10 +70,10 @@ func NewClient( return nil } - require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil)) - require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil)) + require.NoError(t, clientNetwork.Connected(rootCtx, clientNodeID, nil)) + require.NoError(t, clientNetwork.Connected(rootCtx, serverNodeID, nil)) + require.NoError(t, serverNetwork.Connected(rootCtx, clientNodeID, nil)) + require.NoError(t, serverNetwork.Connected(rootCtx, serverNodeID, nil)) require.NoError(t, serverNetwork.AddHandler(0, handler)) return clientNetwork.NewClient(0) diff --git a/network/p2p/p2ptest/client_test.go b/network/p2p/p2ptest/client_test.go index 45ae970ecf0f..cef624aaccbc 100644 --- a/network/p2p/p2ptest/client_test.go +++ b/network/p2p/p2ptest/client_test.go @@ -27,7 +27,7 @@ func TestNewClient_AppGossip(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + client := NewClient(t, ctx, testHandler) require.NoError(client.AppGossip(ctx, common.SendConfig{}, []byte("foobar"))) <-appGossipChan } @@ -94,7 +94,7 @@ func TestNewClient_AppRequest(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + client := NewClient(t, ctx, testHandler) require.NoError(tt.appRequestF( ctx, client, diff --git a/x/sync/client_test.go b/x/sync/client_test.go index decc3e20405d..2633071439da 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -38,12 +38,12 @@ func newDefaultDBConfig() merkledb.Config { } } -func newFlakyRangeProofHandler( +func newModifiedRangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.RangeProof), ) p2p.Handler { - handler := NewGetRangeProofHandler(logging.NoLog{}, db) + handler := NewSyncGetRangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -74,12 +74,12 @@ func newFlakyRangeProofHandler( } } -func newFlakyChangeProofHandler( +func newModifiedChangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.ChangeProof), ) p2p.Handler { - handler := NewGetChangeProofHandler(logging.NoLog{}, db) + handler := NewSyncGetChangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -145,14 +145,3 @@ func (c *counter) Inc() int { c.i++ return result } - -type waitingHandler struct { - p2p.NoOpHandler - handler p2p.Handler - updatedRootChan chan struct{} -} - -func (w *waitingHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { - <-w.updatedRootChan - return w.handler.AppRequest(ctx, nodeID, deadline, requestBytes) -} diff --git a/x/sync/manager.go b/x/sync/manager.go index ddcdc1637088..dd176c223033 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -41,7 +41,7 @@ var ( ErrAlreadyStarted = errors.New("cannot start a Manager that has already been started") ErrAlreadyClosed = errors.New("Manager is closed") ErrNoRangeProofClientProvided = errors.New("range proof client is a required field of the sync config") - ErrNoChangeProofClientProvided = errors.New("change proof client is a required field of the sync config") + ErrNoChangeProofClientProvided = errors.New("change proofclient is a required field of the sync config") ErrNoDatabaseProvided = errors.New("sync database is a required field of the sync config") ErrNoLogProvided = errors.New("log is a required field of the sync config") ErrZeroWorkLimit = errors.New("simultaneous work limit must be greater than 0") @@ -305,12 +305,7 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { return } - select { - case <-ctx.Done(): - m.finishWorkItem() - return - case <-time.After(waitTime): - } + <-time.After(waitTime) if work.localRootID == ids.Empty { // the keys in this range have not been downloaded, so get all key/values @@ -373,8 +368,7 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleChangeProofResponse(ctx, targetRootID, work, request, responseBytes, err); err != nil { - // TODO log responses - m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) + m.config.Log.Debug("dropping response", zap.Error(err)) m.retryWork(work) return } @@ -431,8 +425,7 @@ func (m *Manager) requestRangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleRangeProofResponse(ctx, targetRootID, work, request, responseBytes, appErr); err != nil { - // TODO log responses - m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) + m.config.Log.Debug("dropping response", zap.Error(err)) m.retryWork(work) return } @@ -468,11 +461,10 @@ func (m *Manager) retryWork(work *workItem) { m.workLock.Lock() m.unprocessedWork.Insert(work) m.workLock.Unlock() - m.unprocessedWorkCond.Signal() } // Returns an error if we should drop the response -func (m *Manager) shouldHandleResponse( +func (m *Manager) handleResponse( bytesLimit uint32, responseBytes []byte, err error, @@ -507,7 +499,7 @@ func (m *Manager) handleRangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -558,7 +550,7 @@ func (m *Manager) handleChangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -614,6 +606,7 @@ func (m *Manager) handleChangeProofResponse( m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof) case *pb.SyncGetChangeProofResponse_RangeProof: + var rangeProof merkledb.RangeProof if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil { return err diff --git a/x/sync/network_server.go b/x/sync/network_server.go index 2153f2fbcc97..ec70c2335b64 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -49,8 +49,8 @@ var ( errInvalidBounds = errors.New("start key is greater than end key") errInvalidRootHash = fmt.Errorf("root hash must have length %d", hashing.HashLen) - _ p2p.Handler = (*GetChangeProofHandler)(nil) - _ p2p.Handler = (*GetRangeProofHandler)(nil) + _ p2p.Handler = (*SyncGetChangeProofHandler)(nil) + _ p2p.Handler = (*SyncGetRangeProofHandler)(nil) ) func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { @@ -60,30 +60,30 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { return maybe.Nothing[[]byte]() } -func NewGetChangeProofHandler(log logging.Logger, db DB) *GetChangeProofHandler { - return &GetChangeProofHandler{ +func NewSyncGetChangeProofHandler(log logging.Logger, db DB) *SyncGetChangeProofHandler { + return &SyncGetChangeProofHandler{ log: log, db: db, } } -type GetChangeProofHandler struct { +type SyncGetChangeProofHandler struct { log logging.Logger db DB } -func (*GetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*SyncGetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { - req := &pb.SyncGetChangeProofRequest{} - if err := proto.Unmarshal(requestBytes, req); err != nil { +func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { + request := &pb.SyncGetChangeProofRequest{} + if err := proto.Unmarshal(requestBytes, request); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to unmarshal request: %s", err), } } - if err := validateChangeProofRequest(req); err != nil { + if err := validateChangeProofRequest(request); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("invalid request: %s", err), @@ -92,13 +92,13 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ // override limits if they exceed caps var ( - keyLimit = min(req.KeyLimit, maxKeyValuesLimit) - bytesLimit = min(int(req.BytesLimit), maxByteSizeLimit) - start = maybeBytesToMaybe(req.StartKey) - end = maybeBytesToMaybe(req.EndKey) + keyLimit = min(request.KeyLimit, maxKeyValuesLimit) + bytesLimit = min(int(request.BytesLimit), maxByteSizeLimit) + start = maybeBytesToMaybe(request.StartKey) + end = maybeBytesToMaybe(request.EndKey) ) - startRoot, err := ids.ToID(req.StartRootHash) + startRoot, err := ids.ToID(request.StartRootHash) if err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, @@ -106,7 +106,7 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ } } - endRoot, err := ids.ToID(req.EndRootHash) + endRoot, err := ids.ToID(request.EndRootHash) if err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, @@ -120,7 +120,6 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ if !errors.Is(err, merkledb.ErrInsufficientHistory) { // We should only fail to get a change proof if we have insufficient history. // Other errors are unexpected. - // TODO define custom errors return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to get change proof: %s", err), @@ -141,11 +140,11 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ ctx, g.db, &pb.SyncGetRangeProofRequest{ - RootHash: req.EndRootHash, - StartKey: req.StartKey, - EndKey: req.EndKey, - KeyLimit: req.KeyLimit, - BytesLimit: req.BytesLimit, + RootHash: request.EndRootHash, + StartKey: request.StartKey, + EndKey: request.EndKey, + KeyLimit: request.KeyLimit, + BytesLimit: request.BytesLimit, }, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(&pb.SyncGetChangeProofResponse{ @@ -192,30 +191,34 @@ func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ } } -func NewGetRangeProofHandler(log logging.Logger, db DB) *GetRangeProofHandler { - return &GetRangeProofHandler{ +func (*SyncGetChangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) { + return nil, nil +} + +func NewSyncGetRangeProofHandler(log logging.Logger, db DB) *SyncGetRangeProofHandler { + return &SyncGetRangeProofHandler{ log: log, db: db, } } -type GetRangeProofHandler struct { +type SyncGetRangeProofHandler struct { log logging.Logger db DB } -func (*GetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*SyncGetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { - req := &pb.SyncGetRangeProofRequest{} - if err := proto.Unmarshal(requestBytes, req); err != nil { +func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { + request := &pb.SyncGetRangeProofRequest{} + if err := proto.Unmarshal(requestBytes, request); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to unmarshal request: %s", err), } } - if err := validateRangeProofRequest(req); err != nil { + if err := validateRangeProofRequest(request); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("invalid range proof request: %s", err), @@ -223,13 +226,13 @@ func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ t } // override limits if they exceed caps - req.KeyLimit = min(req.KeyLimit, maxKeyValuesLimit) - req.BytesLimit = min(req.BytesLimit, maxByteSizeLimit) + request.KeyLimit = min(request.KeyLimit, maxKeyValuesLimit) + request.BytesLimit = min(request.BytesLimit, maxByteSizeLimit) proofBytes, err := getRangeProof( ctx, - g.db, - req, + s.db, + request, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(rangeProof.ToProto()) }, @@ -244,6 +247,10 @@ func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ t return proofBytes, nil } +func (*SyncGetRangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) { + return nil, nil +} + // Get the range proof specified by [req]. // If the generated proof is too large, the key limit is reduced // and the proof is regenerated. This process is repeated until diff --git a/x/sync/network_server_test.go b/x/sync/network_server_test.go index c78554cea59f..84dbd1c12682 100644 --- a/x/sync/network_server_test.go +++ b/x/sync/network_server_test.go @@ -85,7 +85,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedErr: p2p.ErrUnexpected, }, { - name: "response bounded by key limit", + name: "key limit too large", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: 2 * defaultRequestKeyLimit, @@ -94,7 +94,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedResponseLen: defaultRequestKeyLimit, }, { - name: "response bounded by byte limit", + name: "bytes limit too large", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: defaultRequestKeyLimit, @@ -118,7 +118,7 @@ func Test_Server_GetRangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewGetRangeProofHandler(logging.NoLog{}, smallTrieDB) + handler := NewSyncGetRangeProofHandler(logging.NoLog{}, smallTrieDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) responseBytes, err := handler.AppRequest(context.Background(), test.nodeID, time.Time{}, requestBytes) @@ -130,12 +130,17 @@ func Test_Server_GetRangeProof(t *testing.T) { require.Nil(responseBytes) return } + require.NotNil(responseBytes) - var proofProto pb.RangeProof - require.NoError(proto.Unmarshal(responseBytes, &proofProto)) + var proof *merkledb.RangeProof + if !test.proofNil { + var proofProto pb.RangeProof + require.NoError(proto.Unmarshal(responseBytes, &proofProto)) - var proof merkledb.RangeProof - require.NoError(proof.UnmarshalProto(&proofProto)) + var p merkledb.RangeProof + require.NoError(p.UnmarshalProto(&proofProto)) + proof = &p + } if test.expectedResponseLen > 0 { require.LessOrEqual(len(proof.KeyValues), test.expectedResponseLen) @@ -339,7 +344,7 @@ func Test_Server_GetChangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewGetChangeProofHandler(logging.NoLog{}, serverDB) + handler := NewSyncGetChangeProofHandler(logging.NoLog{}, serverDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 41dd5829a7b8..db480f90f0c7 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -19,12 +19,13 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/maybe" "github.com/ava-labs/avalanchego/x/merkledb" ) -var _ p2p.Handler = (*waitingHandler)(nil) +var _ p2p.Handler = (*testHandler)(nil) func Test_Creation(t *testing.T) { require := require.New(t) @@ -39,8 +40,8 @@ func Test_Creation(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, BranchFactor: merkledb.BranchFactor16, @@ -72,8 +73,8 @@ func Test_Completion(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, emptyDB)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, emptyDB)), TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -177,8 +178,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -253,8 +254,8 @@ func Test_Sync_FindNextKey_Deleted(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -303,8 +304,8 @@ func Test_Sync_FindNextKey_BranchInLocal(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -340,8 +341,8 @@ func Test_Sync_FindNextKey_BranchInReceived(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -376,8 +377,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -438,8 +439,8 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, db)), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -507,8 +508,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -540,6 +541,7 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { // Test findNextKey by computing the expected result in a naive, inefficient // way and comparing it to the actual result + func TestFindNextKeyRandom(t *testing.T) { now := time.Now().UnixNano() t.Logf("seed: %d", now) @@ -730,8 +732,8 @@ func TestFindNextKeyRandom(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: localDB, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: ids.GenerateTestID(), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -773,27 +775,27 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - too many leaves in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = append(response.KeyValues, merkledb.KeyValue{}) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - removed first key in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - removed first key in response and replaced proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] response.KeyValues = []merkledb.KeyValue{ { @@ -813,111 +815,111 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { } }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - removed key from middle of response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { i := rand.Intn(max(1, len(response.KeyValues)-1)) // #nosec G404 _ = slices.Delete(response.KeyValues, i, min(len(response.KeyValues), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - start and end proof nodes removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - end proof removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof bad response - empty proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil response.KeyValues = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "range proof server flake", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewGetRangeProofHandler(logging.NoLog{}, db), + Handler: NewSyncGetRangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) }, }, { name: "change proof bad response - too many keys in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = append(response.KeyChanges, make([]merkledb.KeyChange, defaultRequestKeyLimit)...) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "change proof bad response - removed first key in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = response.KeyChanges[min(1, len(response.KeyChanges)):] }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { name: "change proof bad response - removed key from middle of response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { i := rand.Intn(max(1, len(response.KeyChanges)-1)) // #nosec G404 _ = slices.Delete(response.KeyChanges, i, min(len(response.KeyChanges), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { - name: "change proof bad response - all proof keys removed from response", + name: "all proof keys removed from response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.StartProof = nil response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + return p2ptest.NewClient(t, context.Background(), handler) }, }, { - name: "change proof flaky server", + name: "flaky change proof client", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewGetChangeProofHandler(logging.NoLog{}, db), + Handler: NewSyncGetChangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) }, }, } @@ -945,14 +947,14 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { changeProofClient *p2p.Client ) - rangeProofHandler := NewGetRangeProofHandler(logging.NoLog{}, dbToSync) - rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + rangeProofHandler := NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync) + rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler) if tt.rangeProofClient != nil { rangeProofClient = tt.rangeProofClient(dbToSync) } - changeProofHandler := NewGetChangeProofHandler(logging.NoLog{}, dbToSync) - changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + changeProofHandler := NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync) + changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler) if tt.changeProofClient != nil { changeProofClient = tt.changeProofClient(dbToSync) } @@ -974,12 +976,8 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { require.NoError(syncer.Start(ctx)) // Simulate writes on the server - // - // TODO add more writes when api is not flaky. There is an inherent - // race condition in between writes where UpdateSyncTarget might - // error because it has already reached the sync target before it - // is called. - for i := 0; i < 50; i++ { + // TODO more than a single write when API is less flaky + for i := 0; i <= 1; i++ { addkey := make([]byte, r.Intn(50)) _, err = r.Read(addkey) require.NoError(err) @@ -1031,8 +1029,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1058,8 +1056,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { newSyncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1128,15 +1126,15 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { updatedRootChan <- struct{}{} ctx := context.Background() - rangeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ - handler: NewGetRangeProofHandler(logging.NoLog{}, dbToSync), + rangeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ + handler: NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) - changeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ - handler: NewGetChangeProofHandler(logging.NoLog{}, dbToSync), + changeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ + handler: NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) + }) syncer, err := NewManager(ManagerConfig{ DB: db, @@ -1184,11 +1182,10 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) - ctx := context.Background() m, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: &p2p.Client{}, + ChangeProofClient: &p2p.Client{}, TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1287,3 +1284,14 @@ func generateTrieWithMinKeyLen(t *testing.T, r *rand.Rand, count int, minKeyLen } return db, batch.Write() } + +type testHandler struct { + p2p.NoOpHandler + handler p2p.Handler + updatedRootChan chan struct{} +} + +func (t *testHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { + <-t.updatedRootChan + return t.handler.AppRequest(ctx, nodeID, deadline, requestBytes) +} From 34fe3a673fd486d13e1c57f8cee74a7c75abe2a2 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:28:56 -0400 Subject: [PATCH 02/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/p2ptest/client.go | 10 +++-- network/p2p/p2ptest/client_test.go | 4 +- x/sync/network_server.go | 48 ++++++++++-------------- x/sync/sync_test.go | 60 +++++++++++++++--------------- 4 files changed, 59 insertions(+), 63 deletions(-) diff --git a/network/p2p/p2ptest/client.go b/network/p2p/p2ptest/client.go index 747904b40ffb..8d41d6b99bce 100644 --- a/network/p2p/p2ptest/client.go +++ b/network/p2p/p2ptest/client.go @@ -21,15 +21,19 @@ import ( // NewClient generates a client-server pair and returns the client used to // communicate with a server with the specified handler -func NewClient(t *testing.T, rootCtx context.Context, handler p2p.Handler) *p2p.Client { +func NewClient( + t *testing.T, + rootCtx context.Context, + handler p2p.Handler, + clientNodeID ids.NodeID, + serverNodeID ids.NodeID, +) *p2p.Client { clientSender := &enginetest.Sender{} serverSender := &enginetest.Sender{} - clientNodeID := ids.GenerateTestNodeID() clientNetwork, err := p2p.NewNetwork(logging.NoLog{}, clientSender, prometheus.NewRegistry(), "") require.NoError(t, err) - serverNodeID := ids.GenerateTestNodeID() serverNetwork, err := p2p.NewNetwork(logging.NoLog{}, serverSender, prometheus.NewRegistry(), "") require.NoError(t, err) diff --git a/network/p2p/p2ptest/client_test.go b/network/p2p/p2ptest/client_test.go index cef624aaccbc..45ae970ecf0f 100644 --- a/network/p2p/p2ptest/client_test.go +++ b/network/p2p/p2ptest/client_test.go @@ -27,7 +27,7 @@ func TestNewClient_AppGossip(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler) + client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) require.NoError(client.AppGossip(ctx, common.SendConfig{}, []byte("foobar"))) <-appGossipChan } @@ -94,7 +94,7 @@ func TestNewClient_AppRequest(t *testing.T) { }, } - client := NewClient(t, ctx, testHandler) + client := NewClient(t, ctx, testHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) require.NoError(tt.appRequestF( ctx, client, diff --git a/x/sync/network_server.go b/x/sync/network_server.go index ec70c2335b64..10d86ed140eb 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -75,15 +75,15 @@ type SyncGetChangeProofHandler struct { func (*SyncGetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { - request := &pb.SyncGetChangeProofRequest{} - if err := proto.Unmarshal(requestBytes, request); err != nil { + req := &pb.SyncGetChangeProofRequest{} + if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to unmarshal request: %s", err), } } - if err := validateChangeProofRequest(request); err != nil { + if err := validateChangeProofRequest(req); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("invalid request: %s", err), @@ -92,13 +92,13 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID // override limits if they exceed caps var ( - keyLimit = min(request.KeyLimit, maxKeyValuesLimit) - bytesLimit = min(int(request.BytesLimit), maxByteSizeLimit) - start = maybeBytesToMaybe(request.StartKey) - end = maybeBytesToMaybe(request.EndKey) + keyLimit = min(req.KeyLimit, maxKeyValuesLimit) + bytesLimit = min(int(req.BytesLimit), maxByteSizeLimit) + start = maybeBytesToMaybe(req.StartKey) + end = maybeBytesToMaybe(req.EndKey) ) - startRoot, err := ids.ToID(request.StartRootHash) + startRoot, err := ids.ToID(req.StartRootHash) if err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, @@ -106,7 +106,7 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID } } - endRoot, err := ids.ToID(request.EndRootHash) + endRoot, err := ids.ToID(req.EndRootHash) if err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, @@ -140,11 +140,11 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID ctx, g.db, &pb.SyncGetRangeProofRequest{ - RootHash: request.EndRootHash, - StartKey: request.StartKey, - EndKey: request.EndKey, - KeyLimit: request.KeyLimit, - BytesLimit: request.BytesLimit, + RootHash: req.EndRootHash, + StartKey: req.StartKey, + EndKey: req.EndKey, + KeyLimit: req.KeyLimit, + BytesLimit: req.BytesLimit, }, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(&pb.SyncGetChangeProofResponse{ @@ -191,10 +191,6 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID } } -func (*SyncGetChangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) { - return nil, nil -} - func NewSyncGetRangeProofHandler(log logging.Logger, db DB) *SyncGetRangeProofHandler { return &SyncGetRangeProofHandler{ log: log, @@ -210,15 +206,15 @@ type SyncGetRangeProofHandler struct { func (*SyncGetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { - request := &pb.SyncGetRangeProofRequest{} - if err := proto.Unmarshal(requestBytes, request); err != nil { + req := &pb.SyncGetRangeProofRequest{} + if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to unmarshal request: %s", err), } } - if err := validateRangeProofRequest(request); err != nil { + if err := validateRangeProofRequest(req); err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("invalid range proof request: %s", err), @@ -226,13 +222,13 @@ func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, } // override limits if they exceed caps - request.KeyLimit = min(request.KeyLimit, maxKeyValuesLimit) - request.BytesLimit = min(request.BytesLimit, maxByteSizeLimit) + req.KeyLimit = min(req.KeyLimit, maxKeyValuesLimit) + req.BytesLimit = min(req.BytesLimit, maxByteSizeLimit) proofBytes, err := getRangeProof( ctx, s.db, - request, + req, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(rangeProof.ToProto()) }, @@ -247,10 +243,6 @@ func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, return proofBytes, nil } -func (*SyncGetRangeProofHandler) CrossChainAppRequest(context.Context, ids.ID, time.Time, []byte) ([]byte, error) { - return nil, nil -} - // Get the range proof specified by [req]. // If the generated proof is too large, the key limit is reduced // and the proof is regenerated. This process is repeated until diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index db480f90f0c7..7373c08bf5c5 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -73,8 +73,8 @@ func Test_Completion(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, emptyDB)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, emptyDB)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -178,8 +178,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -377,8 +377,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -440,7 +440,7 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { syncer, err := NewManager(ManagerConfig{ DB: db, RangeProofClient: &p2p.Client{}, - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, db)), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -508,8 +508,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -779,7 +779,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = append(response.KeyValues, merkledb.KeyValue{}) }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -789,7 +789,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -815,7 +815,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { } }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -826,7 +826,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { _ = slices.Delete(response.KeyValues, i, min(len(response.KeyValues), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -837,7 +837,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -847,7 +847,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -859,7 +859,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyValues = nil }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -868,7 +868,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ Handler: NewSyncGetRangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }) + }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -878,7 +878,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyChanges = append(response.KeyChanges, make([]merkledb.KeyChange, defaultRequestKeyLimit)...) }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -888,7 +888,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.KeyChanges = response.KeyChanges[min(1, len(response.KeyChanges)):] }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -899,7 +899,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { _ = slices.Delete(response.KeyChanges, i, min(len(response.KeyChanges), i+1)) }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -910,7 +910,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { response.EndProof = nil }) - return p2ptest.NewClient(t, context.Background(), handler) + return p2ptest.NewClient(t, context.Background(), handler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, { @@ -919,7 +919,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ Handler: NewSyncGetChangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, - }) + }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, }, } @@ -948,13 +948,13 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { ) rangeProofHandler := NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync) - rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler) + rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.rangeProofClient != nil { rangeProofClient = tt.rangeProofClient(dbToSync) } changeProofHandler := NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync) - changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler) + changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.changeProofClient != nil { changeProofClient = tt.changeProofClient(dbToSync) } @@ -1029,8 +1029,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1056,8 +1056,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { newSyncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync)), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync)), + RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1129,12 +1129,12 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { rangeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ handler: NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }) + }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) changeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ handler: NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, - }) + }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) syncer, err := NewManager(ManagerConfig{ DB: db, From b9d550771feb4b4bcae3342a34774b1cdf567c39 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:28:56 -0400 Subject: [PATCH 03/16] add acp-118 implementation Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/aggregator.go | 229 ++++++++++++++++++++++++++++++ network/acp118/aggregator_test.go | 205 ++++++++++++++++++++++++++ network/acp118/handler.go | 107 ++++++++++++++ network/acp118/handler_test.go | 118 +++++++++++++++ network/p2p/error.go | 6 + 5 files changed, 665 insertions(+) create mode 100644 network/acp118/aggregator.go create mode 100644 network/acp118/aggregator_test.go create mode 100644 network/acp118/handler.go create mode 100644 network/acp118/handler_test.go diff --git a/network/acp118/aggregator.go b/network/acp118/aggregator.go new file mode 100644 index 000000000000..9cbc10a9442c --- /dev/null +++ b/network/acp118/aggregator.go @@ -0,0 +1,229 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.uber.org/zap" + "golang.org/x/sync/semaphore" + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var ( + ErrDuplicateValidator = errors.New("duplicate validator") + ErrInsufficientSignatures = errors.New("failed to aggregate sufficient stake weight of signatures") +) + +type result struct { + message *warp.Message + err error +} + +type Validator struct { + NodeID ids.NodeID + PublicKey *bls.PublicKey + Weight uint64 +} + +type indexedValidator struct { + Validator + I int +} + +// NewSignatureAggregator returns an instance of SignatureAggregator +func NewSignatureAggregator( + log logging.Logger, + client *p2p.Client, + maxPending int, +) *SignatureAggregator { + return &SignatureAggregator{ + log: log, + client: client, + maxPending: int64(maxPending), + } +} + +// SignatureAggregator aggregates validator signatures for warp messages +type SignatureAggregator struct { + log logging.Logger + client *p2p.Client + maxPending int64 +} + +// AggregateSignatures blocks until stakeWeightThreshold of validators signs the +// provided message. Validators are issued requests in the caller-specified +// order. +func (s *SignatureAggregator) AggregateSignatures( + parentCtx context.Context, + message *warp.UnsignedMessage, + justification []byte, + validators []Validator, + stakeWeightThreshold uint64, +) (*warp.Message, error) { + ctx, cancel := context.WithCancel(parentCtx) + defer cancel() + + request := &sdk.SignatureRequest{ + Message: message.Bytes(), + Justification: justification, + } + + requestBytes, err := proto.Marshal(request) + if err != nil { + return nil, fmt.Errorf("failed to marshal signature request: %w", err) + } + + done := make(chan result) + pendingRequests := semaphore.NewWeighted(s.maxPending) + lock := &sync.Mutex{} + aggregatedStakeWeight := uint64(0) + attemptedStakeWeight := uint64(0) + totalStakeWeight := uint64(0) + signatures := make([]*bls.Signature, 0) + signerBitSet := set.NewBits() + + nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) + for i, v := range validators { + totalStakeWeight += v.Weight + + // Sanity check the validator set provided by the caller + if _, ok := nodeIDsToValidator[v.NodeID]; ok { + return nil, fmt.Errorf("%w: %s", ErrDuplicateValidator, v.NodeID) + } + + nodeIDsToValidator[v.NodeID] = indexedValidator{ + I: i, + Validator: v, + } + } + + onResponse := func( + _ context.Context, + nodeID ids.NodeID, + responseBytes []byte, + err error, + ) { + // We are guaranteed a response from a node in the validator set + validator := nodeIDsToValidator[nodeID] + + defer func() { + lock.Lock() + attemptedStakeWeight += validator.Weight + remainingStakeWeight := totalStakeWeight - attemptedStakeWeight + failed := remainingStakeWeight < stakeWeightThreshold + lock.Unlock() + + if failed { + done <- result{err: ErrInsufficientSignatures} + } + + pendingRequests.Release(1) + }() + + if err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + return + } + + response := &sdk.SignatureResponse{} + if err := proto.Unmarshal(responseBytes, response); err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + return + } + + signature, err := bls.SignatureFromBytes(response.Signature) + if err != nil { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "invalid signature"), + zap.Error(err), + ) + return + } + + if !bls.Verify(validator.PublicKey, signature, message.Bytes()) { + s.log.Debug( + "dropping response", + zap.Stringer("nodeID", nodeID), + zap.String("reason", "public key failed verification"), + ) + return + } + + lock.Lock() + signerBitSet.Add(validator.I) + signatures = append(signatures, signature) + aggregatedStakeWeight += validator.Weight + + if aggregatedStakeWeight >= stakeWeightThreshold { + aggregateSignature, err := bls.AggregateSignatures(signatures) + if err != nil { + done <- result{err: err} + lock.Unlock() + return + } + + bitSetSignature := &warp.BitSetSignature{ + Signers: signerBitSet.Bytes(), + Signature: [bls.SignatureLen]byte{}, + } + + copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) + signedMessage, err := warp.NewMessage(message, bitSetSignature) + done <- result{message: signedMessage, err: err} + lock.Unlock() + return + } + + lock.Unlock() + } + + for _, validator := range validators { + if err := pendingRequests.Acquire(ctx, 1); err != nil { + return nil, err + } + + // Avoid loop shadowing in goroutine + validatorCopy := validator + go func() { + if err := s.client.AppRequest( + ctx, + set.Of(validatorCopy.NodeID), + requestBytes, + onResponse, + ); err != nil { + done <- result{err: err} + return + } + }() + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case r := <-done: + return r.message, r.err + } +} diff --git a/network/acp118/aggregator_test.go b/network/acp118/aggregator_test.go new file mode 100644 index 000000000000..50622fc4ac99 --- /dev/null +++ b/network/acp118/aggregator_test.go @@ -0,0 +1,205 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/snow/validators/validatorstest" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +func TestVerifier_Verify(t *testing.T) { + nodeID0 := ids.GenerateTestNodeID() + sk0, err := bls.NewSecretKey() + require.NoError(t, err) + pk0 := bls.PublicFromSecretKey(sk0) + + nodeID1 := ids.GenerateTestNodeID() + sk1, err := bls.NewSecretKey() + require.NoError(t, err) + pk1 := bls.PublicFromSecretKey(sk1) + + networkID := uint32(123) + subnetID := ids.GenerateTestID() + chainID := ids.GenerateTestID() + signer := warp.NewSigner(sk0, networkID, chainID) + + tests := []struct { + name string + + handler p2p.Handler + + ctx context.Context + validators []Validator + threshold uint64 + + pChainState validators.State + pChainHeight uint64 + quorumNum uint64 + quorumDen uint64 + + wantAggregateSignaturesErr error + wantVerifyErr error + }{ + { + name: "passes attestation and verification", + handler: NewHandler(&testAttestor{}, signer, networkID, chainID), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + pChainState: &validatorstest.State{ + T: t, + GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { + return subnetID, nil + }, + GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{ + nodeID0: { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, nil + }, + }, + quorumNum: 1, + quorumDen: 1, + }, + { + name: "passes attestation and fails verification - insufficient stake", + handler: NewHandler(&testAttestor{}, signer, networkID, chainID), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + pChainState: &validatorstest.State{ + T: t, + GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { + return subnetID, nil + }, + GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{ + nodeID0: { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + nodeID1: { + NodeID: nodeID1, + PublicKey: pk1, + Weight: 1, + }, + }, nil + }, + }, + quorumNum: 2, + quorumDen: 2, + wantVerifyErr: warp.ErrInsufficientWeight, + }, + { + name: "fails attestation", + handler: NewHandler( + &testAttestor{Err: errors.New("foobar")}, + signer, + networkID, + chainID, + ), + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + threshold: 1, + wantAggregateSignaturesErr: ErrInsufficientSignatures, + }, + { + name: "invalid validator set", + ctx: context.Background(), + validators: []Validator{ + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + { + NodeID: nodeID0, + PublicKey: pk0, + Weight: 1, + }, + }, + wantAggregateSignaturesErr: ErrDuplicateValidator, + }, + { + name: "context canceled", + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + return ctx + }(), + wantAggregateSignaturesErr: context.Canceled, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + ctx := context.Background() + message, err := warp.NewUnsignedMessage(networkID, chainID, []byte("payload")) + require.NoError(err) + client := p2ptest.NewClient(t, ctx, tt.handler, ids.GenerateTestNodeID(), nodeID0) + verifier := NewSignatureAggregator(logging.NoLog{}, client, 1) + + signedMessage, err := verifier.AggregateSignatures( + tt.ctx, + message, + []byte("justification"), + tt.validators, + tt.threshold, + ) + require.ErrorIs(err, tt.wantAggregateSignaturesErr) + + if signedMessage == nil { + return + } + + err = signedMessage.Signature.Verify( + ctx, + &signedMessage.UnsignedMessage, + networkID, + tt.pChainState, + 0, + tt.quorumNum, + tt.quorumDen, + ) + require.ErrorIs(err, tt.wantVerifyErr) + }) + } +} diff --git a/network/acp118/handler.go b/network/acp118/handler.go new file mode 100644 index 000000000000..e4aae28e3ce6 --- /dev/null +++ b/network/acp118/handler.go @@ -0,0 +1,107 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "fmt" + "time" + + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var _ p2p.Handler = (*Handler)(nil) + +// Attestor defines whether to a warp message payload should be attested to +type Attestor interface { + Attest(message *warp.UnsignedMessage, justification []byte) (bool, error) +} + +// NewHandler returns an instance of Handler +func NewHandler( + attestor Attestor, + signer warp.Signer, + networkID uint32, + chainID ids.ID, +) *Handler { + return &Handler{ + attestor: attestor, + signer: signer, + networkID: networkID, + chainID: chainID, + } +} + +// Handler signs warp messages +type Handler struct { + p2p.NoOpHandler + + attestor Attestor + signer warp.Signer + networkID uint32 + chainID ids.ID +} + +func (h *Handler) AppRequest( + _ context.Context, + _ ids.NodeID, + _ time.Time, + requestBytes []byte, +) ([]byte, *common.AppError) { + request := &sdk.SignatureRequest{} + if err := proto.Unmarshal(requestBytes, request); err != nil { + return nil, &common.AppError{ + Code: p2p.ErrUnexpected.Code, + Message: fmt.Sprintf("failed to unmarshal request: %s", err), + } + } + + msg, err := warp.ParseUnsignedMessage(request.Message) + if err != nil { + return nil, &common.AppError{ + Code: p2p.ErrUnexpected.Code, + Message: fmt.Sprintf("failed to initialize warp unsigned message: %s", err), + } + } + + ok, err := h.attestor.Attest(msg, request.Justification) + if err != nil { + return nil, &common.AppError{ + Code: p2p.ErrUnexpected.Code, + Message: fmt.Sprintf("failed to attest request: %s", err), + } + } + + if !ok { + return nil, p2p.ErrAttestFailed + } + + signature, err := h.signer.Sign(msg) + if err != nil { + return nil, &common.AppError{ + Code: p2p.ErrUnexpected.Code, + Message: fmt.Sprintf("failed to sign message: %s", err), + } + } + + response := &sdk.SignatureResponse{ + Signature: signature, + } + + responseBytes, err := proto.Marshal(response) + if err != nil { + return nil, &common.AppError{ + Code: p2p.ErrUnexpected.Code, + Message: fmt.Sprintf("failed to marshal response: %s", err), + } + } + + return responseBytes, nil +} diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go new file mode 100644 index 000000000000..77af9e8dd0fb --- /dev/null +++ b/network/acp118/handler_test.go @@ -0,0 +1,118 @@ +// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package acp118 + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/network/p2p/p2ptest" + "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/utils/crypto/bls" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/ava-labs/avalanchego/vms/platformvm/warp" +) + +var _ Attestor = (*testAttestor)(nil) + +func TestHandler(t *testing.T) { + tests := []struct { + name string + attestor Attestor + expectedErr error + expectedVerify bool + }{ + { + name: "signature fails attestation", + attestor: &testAttestor{Err: errors.New("foo")}, + expectedErr: p2p.ErrUnexpected, + }, + { + name: "signature not attested", + attestor: &testAttestor{CantAttest: true}, + expectedErr: p2p.ErrAttestFailed, + }, + { + name: "signature attested", + attestor: &testAttestor{}, + expectedVerify: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + ctx := context.Background() + sk, err := bls.NewSecretKey() + require.NoError(err) + pk := bls.PublicFromSecretKey(sk) + networkID := uint32(123) + chainID := ids.GenerateTestID() + signer := warp.NewSigner(sk, networkID, chainID) + h := NewHandler(tt.attestor, signer, networkID, chainID) + clientNodeID := ids.GenerateTestNodeID() + serverNodeID := ids.GenerateTestNodeID() + c := p2ptest.NewClient( + t, + ctx, + h, + clientNodeID, + serverNodeID, + ) + + unsignedMessage, err := warp.NewUnsignedMessage( + networkID, + chainID, + []byte("payload"), + ) + require.NoError(err) + + request := &sdk.SignatureRequest{ + Message: unsignedMessage.Bytes(), + Justification: []byte("justification"), + } + + requestBytes, err := proto.Marshal(request) + require.NoError(err) + + done := make(chan struct{}) + onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, appErr error) { + defer close(done) + + if appErr != nil { + require.ErrorIs(tt.expectedErr, appErr) + return + } + + response := &sdk.SignatureResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + + signature, err := bls.SignatureFromBytes(response.Signature) + require.NoError(err) + + require.Equal(tt.expectedVerify, bls.Verify(pk, signature, request.Message)) + } + + require.NoError(c.AppRequest(ctx, set.Of(serverNodeID), requestBytes, onResponse)) + <-done + }) + } +} + +// The zero value of testAttestor attests +type testAttestor struct { + CantAttest bool + Err error +} + +func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) (bool, error) { + return !t.CantAttest, t.Err +} diff --git a/network/p2p/error.go b/network/p2p/error.go index 07207319a041..67b0317153e6 100644 --- a/network/p2p/error.go +++ b/network/p2p/error.go @@ -30,4 +30,10 @@ var ( Code: -4, Message: "throttled", } + // ErrAttestFailed should be used to indicate that a request failed + // to be signed due to the peer being unable to attest the message + ErrAttestFailed = &common.AppError{ + Code: -5, + Message: "failed attestation", + } ) From 99f0cde16774eb42cc9e6b00a9ac432e7773e336 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:48:26 -0400 Subject: [PATCH 04/16] undo diff Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/aggregator.go | 229 ------------------------------ network/acp118/aggregator_test.go | 205 -------------------------- x/sync/client_test.go | 19 ++- x/sync/manager.go | 23 +-- x/sync/network_server.go | 27 ++-- x/sync/network_server_test.go | 21 ++- x/sync/sync_test.go | 118 +++++++-------- 7 files changed, 107 insertions(+), 535 deletions(-) delete mode 100644 network/acp118/aggregator.go delete mode 100644 network/acp118/aggregator_test.go diff --git a/network/acp118/aggregator.go b/network/acp118/aggregator.go deleted file mode 100644 index 9cbc10a9442c..000000000000 --- a/network/acp118/aggregator.go +++ /dev/null @@ -1,229 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package acp118 - -import ( - "context" - "errors" - "fmt" - "sync" - - "go.uber.org/zap" - "golang.org/x/sync/semaphore" - "google.golang.org/protobuf/proto" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/proto/pb/sdk" - "github.com/ava-labs/avalanchego/utils/crypto/bls" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/utils/set" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -var ( - ErrDuplicateValidator = errors.New("duplicate validator") - ErrInsufficientSignatures = errors.New("failed to aggregate sufficient stake weight of signatures") -) - -type result struct { - message *warp.Message - err error -} - -type Validator struct { - NodeID ids.NodeID - PublicKey *bls.PublicKey - Weight uint64 -} - -type indexedValidator struct { - Validator - I int -} - -// NewSignatureAggregator returns an instance of SignatureAggregator -func NewSignatureAggregator( - log logging.Logger, - client *p2p.Client, - maxPending int, -) *SignatureAggregator { - return &SignatureAggregator{ - log: log, - client: client, - maxPending: int64(maxPending), - } -} - -// SignatureAggregator aggregates validator signatures for warp messages -type SignatureAggregator struct { - log logging.Logger - client *p2p.Client - maxPending int64 -} - -// AggregateSignatures blocks until stakeWeightThreshold of validators signs the -// provided message. Validators are issued requests in the caller-specified -// order. -func (s *SignatureAggregator) AggregateSignatures( - parentCtx context.Context, - message *warp.UnsignedMessage, - justification []byte, - validators []Validator, - stakeWeightThreshold uint64, -) (*warp.Message, error) { - ctx, cancel := context.WithCancel(parentCtx) - defer cancel() - - request := &sdk.SignatureRequest{ - Message: message.Bytes(), - Justification: justification, - } - - requestBytes, err := proto.Marshal(request) - if err != nil { - return nil, fmt.Errorf("failed to marshal signature request: %w", err) - } - - done := make(chan result) - pendingRequests := semaphore.NewWeighted(s.maxPending) - lock := &sync.Mutex{} - aggregatedStakeWeight := uint64(0) - attemptedStakeWeight := uint64(0) - totalStakeWeight := uint64(0) - signatures := make([]*bls.Signature, 0) - signerBitSet := set.NewBits() - - nodeIDsToValidator := make(map[ids.NodeID]indexedValidator) - for i, v := range validators { - totalStakeWeight += v.Weight - - // Sanity check the validator set provided by the caller - if _, ok := nodeIDsToValidator[v.NodeID]; ok { - return nil, fmt.Errorf("%w: %s", ErrDuplicateValidator, v.NodeID) - } - - nodeIDsToValidator[v.NodeID] = indexedValidator{ - I: i, - Validator: v, - } - } - - onResponse := func( - _ context.Context, - nodeID ids.NodeID, - responseBytes []byte, - err error, - ) { - // We are guaranteed a response from a node in the validator set - validator := nodeIDsToValidator[nodeID] - - defer func() { - lock.Lock() - attemptedStakeWeight += validator.Weight - remainingStakeWeight := totalStakeWeight - attemptedStakeWeight - failed := remainingStakeWeight < stakeWeightThreshold - lock.Unlock() - - if failed { - done <- result{err: ErrInsufficientSignatures} - } - - pendingRequests.Release(1) - }() - - if err != nil { - s.log.Debug( - "dropping response", - zap.Stringer("nodeID", nodeID), - zap.Error(err), - ) - return - } - - response := &sdk.SignatureResponse{} - if err := proto.Unmarshal(responseBytes, response); err != nil { - s.log.Debug( - "dropping response", - zap.Stringer("nodeID", nodeID), - zap.Error(err), - ) - return - } - - signature, err := bls.SignatureFromBytes(response.Signature) - if err != nil { - s.log.Debug( - "dropping response", - zap.Stringer("nodeID", nodeID), - zap.String("reason", "invalid signature"), - zap.Error(err), - ) - return - } - - if !bls.Verify(validator.PublicKey, signature, message.Bytes()) { - s.log.Debug( - "dropping response", - zap.Stringer("nodeID", nodeID), - zap.String("reason", "public key failed verification"), - ) - return - } - - lock.Lock() - signerBitSet.Add(validator.I) - signatures = append(signatures, signature) - aggregatedStakeWeight += validator.Weight - - if aggregatedStakeWeight >= stakeWeightThreshold { - aggregateSignature, err := bls.AggregateSignatures(signatures) - if err != nil { - done <- result{err: err} - lock.Unlock() - return - } - - bitSetSignature := &warp.BitSetSignature{ - Signers: signerBitSet.Bytes(), - Signature: [bls.SignatureLen]byte{}, - } - - copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature)) - signedMessage, err := warp.NewMessage(message, bitSetSignature) - done <- result{message: signedMessage, err: err} - lock.Unlock() - return - } - - lock.Unlock() - } - - for _, validator := range validators { - if err := pendingRequests.Acquire(ctx, 1); err != nil { - return nil, err - } - - // Avoid loop shadowing in goroutine - validatorCopy := validator - go func() { - if err := s.client.AppRequest( - ctx, - set.Of(validatorCopy.NodeID), - requestBytes, - onResponse, - ); err != nil { - done <- result{err: err} - return - } - }() - } - - select { - case <-ctx.Done(): - return nil, ctx.Err() - case r := <-done: - return r.message, r.err - } -} diff --git a/network/acp118/aggregator_test.go b/network/acp118/aggregator_test.go deleted file mode 100644 index 50622fc4ac99..000000000000 --- a/network/acp118/aggregator_test.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package acp118 - -import ( - "context" - "errors" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/network/p2p" - "github.com/ava-labs/avalanchego/network/p2p/p2ptest" - "github.com/ava-labs/avalanchego/snow/validators" - "github.com/ava-labs/avalanchego/snow/validators/validatorstest" - "github.com/ava-labs/avalanchego/utils/crypto/bls" - "github.com/ava-labs/avalanchego/utils/logging" - "github.com/ava-labs/avalanchego/vms/platformvm/warp" -) - -func TestVerifier_Verify(t *testing.T) { - nodeID0 := ids.GenerateTestNodeID() - sk0, err := bls.NewSecretKey() - require.NoError(t, err) - pk0 := bls.PublicFromSecretKey(sk0) - - nodeID1 := ids.GenerateTestNodeID() - sk1, err := bls.NewSecretKey() - require.NoError(t, err) - pk1 := bls.PublicFromSecretKey(sk1) - - networkID := uint32(123) - subnetID := ids.GenerateTestID() - chainID := ids.GenerateTestID() - signer := warp.NewSigner(sk0, networkID, chainID) - - tests := []struct { - name string - - handler p2p.Handler - - ctx context.Context - validators []Validator - threshold uint64 - - pChainState validators.State - pChainHeight uint64 - quorumNum uint64 - quorumDen uint64 - - wantAggregateSignaturesErr error - wantVerifyErr error - }{ - { - name: "passes attestation and verification", - handler: NewHandler(&testAttestor{}, signer, networkID, chainID), - ctx: context.Background(), - validators: []Validator{ - { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - }, - threshold: 1, - pChainState: &validatorstest.State{ - T: t, - GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { - return subnetID, nil - }, - GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { - return map[ids.NodeID]*validators.GetValidatorOutput{ - nodeID0: { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - }, nil - }, - }, - quorumNum: 1, - quorumDen: 1, - }, - { - name: "passes attestation and fails verification - insufficient stake", - handler: NewHandler(&testAttestor{}, signer, networkID, chainID), - ctx: context.Background(), - validators: []Validator{ - { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - }, - threshold: 1, - pChainState: &validatorstest.State{ - T: t, - GetSubnetIDF: func(context.Context, ids.ID) (ids.ID, error) { - return subnetID, nil - }, - GetValidatorSetF: func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { - return map[ids.NodeID]*validators.GetValidatorOutput{ - nodeID0: { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - nodeID1: { - NodeID: nodeID1, - PublicKey: pk1, - Weight: 1, - }, - }, nil - }, - }, - quorumNum: 2, - quorumDen: 2, - wantVerifyErr: warp.ErrInsufficientWeight, - }, - { - name: "fails attestation", - handler: NewHandler( - &testAttestor{Err: errors.New("foobar")}, - signer, - networkID, - chainID, - ), - ctx: context.Background(), - validators: []Validator{ - { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - }, - threshold: 1, - wantAggregateSignaturesErr: ErrInsufficientSignatures, - }, - { - name: "invalid validator set", - ctx: context.Background(), - validators: []Validator{ - { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - { - NodeID: nodeID0, - PublicKey: pk0, - Weight: 1, - }, - }, - wantAggregateSignaturesErr: ErrDuplicateValidator, - }, - { - name: "context canceled", - ctx: func() context.Context { - ctx, cancel := context.WithCancel(context.Background()) - cancel() - - return ctx - }(), - wantAggregateSignaturesErr: context.Canceled, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - - ctx := context.Background() - message, err := warp.NewUnsignedMessage(networkID, chainID, []byte("payload")) - require.NoError(err) - client := p2ptest.NewClient(t, ctx, tt.handler, ids.GenerateTestNodeID(), nodeID0) - verifier := NewSignatureAggregator(logging.NoLog{}, client, 1) - - signedMessage, err := verifier.AggregateSignatures( - tt.ctx, - message, - []byte("justification"), - tt.validators, - tt.threshold, - ) - require.ErrorIs(err, tt.wantAggregateSignaturesErr) - - if signedMessage == nil { - return - } - - err = signedMessage.Signature.Verify( - ctx, - &signedMessage.UnsignedMessage, - networkID, - tt.pChainState, - 0, - tt.quorumNum, - tt.quorumDen, - ) - require.ErrorIs(err, tt.wantVerifyErr) - }) - } -} diff --git a/x/sync/client_test.go b/x/sync/client_test.go index 2633071439da..decc3e20405d 100644 --- a/x/sync/client_test.go +++ b/x/sync/client_test.go @@ -38,12 +38,12 @@ func newDefaultDBConfig() merkledb.Config { } } -func newModifiedRangeProofHandler( +func newFlakyRangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.RangeProof), ) p2p.Handler { - handler := NewSyncGetRangeProofHandler(logging.NoLog{}, db) + handler := NewGetRangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -74,12 +74,12 @@ func newModifiedRangeProofHandler( } } -func newModifiedChangeProofHandler( +func newFlakyChangeProofHandler( t *testing.T, db merkledb.MerkleDB, modifyResponse func(response *merkledb.ChangeProof), ) p2p.Handler { - handler := NewSyncGetChangeProofHandler(logging.NoLog{}, db) + handler := NewGetChangeProofHandler(logging.NoLog{}, db) c := counter{m: 2} return &p2p.TestHandler{ @@ -145,3 +145,14 @@ func (c *counter) Inc() int { c.i++ return result } + +type waitingHandler struct { + p2p.NoOpHandler + handler p2p.Handler + updatedRootChan chan struct{} +} + +func (w *waitingHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { + <-w.updatedRootChan + return w.handler.AppRequest(ctx, nodeID, deadline, requestBytes) +} diff --git a/x/sync/manager.go b/x/sync/manager.go index dd176c223033..ddcdc1637088 100644 --- a/x/sync/manager.go +++ b/x/sync/manager.go @@ -41,7 +41,7 @@ var ( ErrAlreadyStarted = errors.New("cannot start a Manager that has already been started") ErrAlreadyClosed = errors.New("Manager is closed") ErrNoRangeProofClientProvided = errors.New("range proof client is a required field of the sync config") - ErrNoChangeProofClientProvided = errors.New("change proofclient is a required field of the sync config") + ErrNoChangeProofClientProvided = errors.New("change proof client is a required field of the sync config") ErrNoDatabaseProvided = errors.New("sync database is a required field of the sync config") ErrNoLogProvided = errors.New("log is a required field of the sync config") ErrZeroWorkLimit = errors.New("simultaneous work limit must be greater than 0") @@ -305,7 +305,12 @@ func (m *Manager) doWork(ctx context.Context, work *workItem) { return } - <-time.After(waitTime) + select { + case <-ctx.Done(): + m.finishWorkItem() + return + case <-time.After(waitTime): + } if work.localRootID == ids.Empty { // the keys in this range have not been downloaded, so get all key/values @@ -368,7 +373,8 @@ func (m *Manager) requestChangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleChangeProofResponse(ctx, targetRootID, work, request, responseBytes, err); err != nil { - m.config.Log.Debug("dropping response", zap.Error(err)) + // TODO log responses + m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) m.retryWork(work) return } @@ -425,7 +431,8 @@ func (m *Manager) requestRangeProof(ctx context.Context, work *workItem) { defer m.finishWorkItem() if err := m.handleRangeProofResponse(ctx, targetRootID, work, request, responseBytes, appErr); err != nil { - m.config.Log.Debug("dropping response", zap.Error(err)) + // TODO log responses + m.config.Log.Debug("dropping response", zap.Error(err), zap.Stringer("request", request)) m.retryWork(work) return } @@ -461,10 +468,11 @@ func (m *Manager) retryWork(work *workItem) { m.workLock.Lock() m.unprocessedWork.Insert(work) m.workLock.Unlock() + m.unprocessedWorkCond.Signal() } // Returns an error if we should drop the response -func (m *Manager) handleResponse( +func (m *Manager) shouldHandleResponse( bytesLimit uint32, responseBytes []byte, err error, @@ -499,7 +507,7 @@ func (m *Manager) handleRangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -550,7 +558,7 @@ func (m *Manager) handleChangeProofResponse( responseBytes []byte, err error, ) error { - if err := m.handleResponse(request.BytesLimit, responseBytes, err); err != nil { + if err := m.shouldHandleResponse(request.BytesLimit, responseBytes, err); err != nil { return err } @@ -606,7 +614,6 @@ func (m *Manager) handleChangeProofResponse( m.completeWorkItem(ctx, work, largestHandledKey, targetRootID, changeProof.EndProof) case *pb.SyncGetChangeProofResponse_RangeProof: - var rangeProof merkledb.RangeProof if err := rangeProof.UnmarshalProto(changeProofResp.RangeProof); err != nil { return err diff --git a/x/sync/network_server.go b/x/sync/network_server.go index 10d86ed140eb..2153f2fbcc97 100644 --- a/x/sync/network_server.go +++ b/x/sync/network_server.go @@ -49,8 +49,8 @@ var ( errInvalidBounds = errors.New("start key is greater than end key") errInvalidRootHash = fmt.Errorf("root hash must have length %d", hashing.HashLen) - _ p2p.Handler = (*SyncGetChangeProofHandler)(nil) - _ p2p.Handler = (*SyncGetRangeProofHandler)(nil) + _ p2p.Handler = (*GetChangeProofHandler)(nil) + _ p2p.Handler = (*GetRangeProofHandler)(nil) ) func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { @@ -60,21 +60,21 @@ func maybeBytesToMaybe(mb *pb.MaybeBytes) maybe.Maybe[[]byte] { return maybe.Nothing[[]byte]() } -func NewSyncGetChangeProofHandler(log logging.Logger, db DB) *SyncGetChangeProofHandler { - return &SyncGetChangeProofHandler{ +func NewGetChangeProofHandler(log logging.Logger, db DB) *GetChangeProofHandler { + return &GetChangeProofHandler{ log: log, db: db, } } -type SyncGetChangeProofHandler struct { +type GetChangeProofHandler struct { log logging.Logger db DB } -func (*SyncGetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*GetChangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { +func (g *GetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { req := &pb.SyncGetChangeProofRequest{} if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ @@ -120,6 +120,7 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID if !errors.Is(err, merkledb.ErrInsufficientHistory) { // We should only fail to get a change proof if we have insufficient history. // Other errors are unexpected. + // TODO define custom errors return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, Message: fmt.Sprintf("failed to get change proof: %s", err), @@ -191,21 +192,21 @@ func (s *SyncGetChangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID } } -func NewSyncGetRangeProofHandler(log logging.Logger, db DB) *SyncGetRangeProofHandler { - return &SyncGetRangeProofHandler{ +func NewGetRangeProofHandler(log logging.Logger, db DB) *GetRangeProofHandler { + return &GetRangeProofHandler{ log: log, db: db, } } -type SyncGetRangeProofHandler struct { +type GetRangeProofHandler struct { log logging.Logger db DB } -func (*SyncGetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} +func (*GetRangeProofHandler) AppGossip(context.Context, ids.NodeID, []byte) {} -func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { +func (g *GetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, *common.AppError) { req := &pb.SyncGetRangeProofRequest{} if err := proto.Unmarshal(requestBytes, req); err != nil { return nil, &common.AppError{ @@ -227,7 +228,7 @@ func (s *SyncGetRangeProofHandler) AppRequest(ctx context.Context, _ ids.NodeID, proofBytes, err := getRangeProof( ctx, - s.db, + g.db, req, func(rangeProof *merkledb.RangeProof) ([]byte, error) { return proto.Marshal(rangeProof.ToProto()) diff --git a/x/sync/network_server_test.go b/x/sync/network_server_test.go index 84dbd1c12682..c78554cea59f 100644 --- a/x/sync/network_server_test.go +++ b/x/sync/network_server_test.go @@ -85,7 +85,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedErr: p2p.ErrUnexpected, }, { - name: "key limit too large", + name: "response bounded by key limit", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: 2 * defaultRequestKeyLimit, @@ -94,7 +94,7 @@ func Test_Server_GetRangeProof(t *testing.T) { expectedResponseLen: defaultRequestKeyLimit, }, { - name: "bytes limit too large", + name: "response bounded by byte limit", request: &pb.SyncGetRangeProofRequest{ RootHash: smallTrieRoot[:], KeyLimit: defaultRequestKeyLimit, @@ -118,7 +118,7 @@ func Test_Server_GetRangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewSyncGetRangeProofHandler(logging.NoLog{}, smallTrieDB) + handler := NewGetRangeProofHandler(logging.NoLog{}, smallTrieDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) responseBytes, err := handler.AppRequest(context.Background(), test.nodeID, time.Time{}, requestBytes) @@ -130,17 +130,12 @@ func Test_Server_GetRangeProof(t *testing.T) { require.Nil(responseBytes) return } - require.NotNil(responseBytes) - var proof *merkledb.RangeProof - if !test.proofNil { - var proofProto pb.RangeProof - require.NoError(proto.Unmarshal(responseBytes, &proofProto)) + var proofProto pb.RangeProof + require.NoError(proto.Unmarshal(responseBytes, &proofProto)) - var p merkledb.RangeProof - require.NoError(p.UnmarshalProto(&proofProto)) - proof = &p - } + var proof merkledb.RangeProof + require.NoError(proof.UnmarshalProto(&proofProto)) if test.expectedResponseLen > 0 { require.LessOrEqual(len(proof.KeyValues), test.expectedResponseLen) @@ -344,7 +339,7 @@ func Test_Server_GetChangeProof(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - handler := NewSyncGetChangeProofHandler(logging.NoLog{}, serverDB) + handler := NewGetChangeProofHandler(logging.NoLog{}, serverDB) requestBytes, err := proto.Marshal(test.request) require.NoError(err) diff --git a/x/sync/sync_test.go b/x/sync/sync_test.go index 7373c08bf5c5..41dd5829a7b8 100644 --- a/x/sync/sync_test.go +++ b/x/sync/sync_test.go @@ -19,13 +19,12 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" - "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/maybe" "github.com/ava-labs/avalanchego/x/merkledb" ) -var _ p2p.Handler = (*testHandler)(nil) +var _ p2p.Handler = (*waitingHandler)(nil) func Test_Creation(t *testing.T) { require := require.New(t) @@ -40,8 +39,8 @@ func Test_Creation(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, BranchFactor: merkledb.BranchFactor16, @@ -73,8 +72,8 @@ func Test_Completion(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, emptyDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: emptyRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -178,8 +177,8 @@ func Test_Sync_FindNextKey_InSync(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -254,8 +253,8 @@ func Test_Sync_FindNextKey_Deleted(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -304,8 +303,8 @@ func Test_Sync_FindNextKey_BranchInLocal(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -341,8 +340,8 @@ func Test_Sync_FindNextKey_BranchInReceived(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: targetRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -377,8 +376,8 @@ func Test_Sync_FindNextKey_ExtraValues(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -439,8 +438,8 @@ func TestFindNextKeyEmptyEndProof(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -508,8 +507,8 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -541,7 +540,6 @@ func Test_Sync_FindNextKey_DifferentChild(t *testing.T) { // Test findNextKey by computing the expected result in a naive, inefficient // way and comparing it to the actual result - func TestFindNextKeyRandom(t *testing.T) { now := time.Now().UnixNano() t.Logf("seed: %d", now) @@ -732,8 +730,8 @@ func TestFindNextKeyRandom(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: localDB, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, remoteDB), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: ids.GenerateTestID(), SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -775,7 +773,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - too many leaves in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = append(response.KeyValues, merkledb.KeyValue{}) }) @@ -785,7 +783,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed first key in response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] }) @@ -795,7 +793,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed first key in response and replaced proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.KeyValues = response.KeyValues[min(1, len(response.KeyValues)):] response.KeyValues = []merkledb.KeyValue{ { @@ -821,7 +819,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - removed key from middle of response", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { i := rand.Intn(max(1, len(response.KeyValues)-1)) // #nosec G404 _ = slices.Delete(response.KeyValues, i, min(len(response.KeyValues), i+1)) }) @@ -832,7 +830,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - start and end proof nodes removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil }) @@ -843,7 +841,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - end proof removed", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.EndProof = nil }) @@ -853,7 +851,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "range proof bad response - empty proof", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedRangeProofHandler(t, db, func(response *merkledb.RangeProof) { + handler := newFlakyRangeProofHandler(t, db, func(response *merkledb.RangeProof) { response.StartProof = nil response.EndProof = nil response.KeyValues = nil @@ -866,7 +864,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { name: "range proof server flake", rangeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewSyncGetRangeProofHandler(logging.NoLog{}, db), + Handler: NewGetRangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, @@ -874,7 +872,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - too many keys in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = append(response.KeyChanges, make([]merkledb.KeyChange, defaultRequestKeyLimit)...) }) @@ -884,7 +882,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - removed first key in response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.KeyChanges = response.KeyChanges[min(1, len(response.KeyChanges)):] }) @@ -894,7 +892,7 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { { name: "change proof bad response - removed key from middle of response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { i := rand.Intn(max(1, len(response.KeyChanges)-1)) // #nosec G404 _ = slices.Delete(response.KeyChanges, i, min(len(response.KeyChanges), i+1)) }) @@ -903,9 +901,9 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { }, }, { - name: "all proof keys removed from response", + name: "change proof bad response - all proof keys removed from response", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { - handler := newModifiedChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { + handler := newFlakyChangeProofHandler(t, db, func(response *merkledb.ChangeProof) { response.StartProof = nil response.EndProof = nil }) @@ -914,10 +912,10 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { }, }, { - name: "flaky change proof client", + name: "change proof flaky server", changeProofClient: func(db merkledb.MerkleDB) *p2p.Client { return p2ptest.NewClient(t, context.Background(), &flakyHandler{ - Handler: NewSyncGetChangeProofHandler(logging.NoLog{}, db), + Handler: NewGetChangeProofHandler(logging.NoLog{}, db), c: &counter{m: 2}, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) }, @@ -947,13 +945,13 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { changeProofClient *p2p.Client ) - rangeProofHandler := NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync) + rangeProofHandler := NewGetRangeProofHandler(logging.NoLog{}, dbToSync) rangeProofClient = p2ptest.NewClient(t, ctx, rangeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.rangeProofClient != nil { rangeProofClient = tt.rangeProofClient(dbToSync) } - changeProofHandler := NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync) + changeProofHandler := NewGetChangeProofHandler(logging.NoLog{}, dbToSync) changeProofClient = p2ptest.NewClient(t, ctx, changeProofHandler, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) if tt.changeProofClient != nil { changeProofClient = tt.changeProofClient(dbToSync) @@ -976,8 +974,12 @@ func Test_Sync_Result_Correct_Root(t *testing.T) { require.NoError(syncer.Start(ctx)) // Simulate writes on the server - // TODO more than a single write when API is less flaky - for i := 0; i <= 1; i++ { + // + // TODO add more writes when api is not flaky. There is an inherent + // race condition in between writes where UpdateSyncTarget might + // error because it has already reached the sync target before it + // is called. + for i := 0; i < 50; i++ { addkey := make([]byte, r.Intn(50)) _, err = r.Read(addkey) require.NoError(err) @@ -1029,8 +1031,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { ctx := context.Background() syncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1056,8 +1058,8 @@ func Test_Sync_Result_Correct_Root_With_Sync_Restart(t *testing.T) { newSyncer, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), - ChangeProofClient: p2ptest.NewClient(t, ctx, NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, dbToSync), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: syncRoot, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1126,13 +1128,13 @@ func Test_Sync_Result_Correct_Root_Update_Root_During(t *testing.T) { updatedRootChan <- struct{}{} ctx := context.Background() - rangeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ - handler: NewSyncGetRangeProofHandler(logging.NoLog{}, dbToSync), + rangeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ + handler: NewGetRangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) - changeProofClient := p2ptest.NewClient(t, ctx, &testHandler{ - handler: NewSyncGetChangeProofHandler(logging.NoLog{}, dbToSync), + changeProofClient := p2ptest.NewClient(t, ctx, &waitingHandler{ + handler: NewGetChangeProofHandler(logging.NoLog{}, dbToSync), updatedRootChan: updatedRootChan, }, ids.GenerateTestNodeID(), ids.GenerateTestNodeID()) @@ -1182,10 +1184,11 @@ func Test_Sync_UpdateSyncTarget(t *testing.T) { newDefaultDBConfig(), ) require.NoError(err) + ctx := context.Background() m, err := NewManager(ManagerConfig{ DB: db, - RangeProofClient: &p2p.Client{}, - ChangeProofClient: &p2p.Client{}, + RangeProofClient: p2ptest.NewClient(t, ctx, NewGetRangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), + ChangeProofClient: p2ptest.NewClient(t, ctx, NewGetChangeProofHandler(logging.NoLog{}, db), ids.GenerateTestNodeID(), ids.GenerateTestNodeID()), TargetRoot: ids.Empty, SimultaneousWorkLimit: 5, Log: logging.NoLog{}, @@ -1284,14 +1287,3 @@ func generateTrieWithMinKeyLen(t *testing.T, r *rand.Rand, count int, minKeyLen } return db, batch.Write() } - -type testHandler struct { - p2p.NoOpHandler - handler p2p.Handler - updatedRootChan chan struct{} -} - -func (t *testHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, *common.AppError) { - <-t.updatedRootChan - return t.handler.AppRequest(ctx, nodeID, deadline, requestBytes) -} From 00fc8d1b578c41484d21b26b1b9a165f8191ee3a Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:54:09 -0400 Subject: [PATCH 05/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler.go | 25 ++++++++----------------- network/acp118/handler_test.go | 12 ++++++------ 2 files changed, 14 insertions(+), 23 deletions(-) diff --git a/network/acp118/handler.go b/network/acp118/handler.go index e4aae28e3ce6..1a9e210e55d8 100644 --- a/network/acp118/handler.go +++ b/network/acp118/handler.go @@ -21,21 +21,17 @@ var _ p2p.Handler = (*Handler)(nil) // Attestor defines whether to a warp message payload should be attested to type Attestor interface { - Attest(message *warp.UnsignedMessage, justification []byte) (bool, error) + Attest(message *warp.UnsignedMessage, justification []byte) (bool, *common.AppError) } // NewHandler returns an instance of Handler func NewHandler( attestor Attestor, signer warp.Signer, - networkID uint32, - chainID ids.ID, ) *Handler { return &Handler{ - attestor: attestor, - signer: signer, - networkID: networkID, - chainID: chainID, + attestor: attestor, + signer: signer, } } @@ -43,10 +39,8 @@ func NewHandler( type Handler struct { p2p.NoOpHandler - attestor Attestor - signer warp.Signer - networkID uint32 - chainID ids.ID + attestor Attestor + signer warp.Signer } func (h *Handler) AppRequest( @@ -71,12 +65,9 @@ func (h *Handler) AppRequest( } } - ok, err := h.attestor.Attest(msg, request.Justification) - if err != nil { - return nil, &common.AppError{ - Code: p2p.ErrUnexpected.Code, - Message: fmt.Sprintf("failed to attest request: %s", err), - } + ok, appErr := h.attestor.Attest(msg, request.Justification) + if appErr != nil { + return nil, appErr } if !ok { diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go index 77af9e8dd0fb..c3a9a7de96ee 100644 --- a/network/acp118/handler_test.go +++ b/network/acp118/handler_test.go @@ -5,7 +5,6 @@ package acp118 import ( "context" - "errors" "testing" "github.com/stretchr/testify/require" @@ -15,6 +14,7 @@ import ( "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" "github.com/ava-labs/avalanchego/proto/pb/sdk" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/vms/platformvm/warp" @@ -31,8 +31,8 @@ func TestHandler(t *testing.T) { }{ { name: "signature fails attestation", - attestor: &testAttestor{Err: errors.New("foo")}, - expectedErr: p2p.ErrUnexpected, + attestor: &testAttestor{Err: &common.AppError{Code: int32(123)}}, + expectedErr: &common.AppError{Code: int32(123)}, }, { name: "signature not attested", @@ -57,7 +57,7 @@ func TestHandler(t *testing.T) { networkID := uint32(123) chainID := ids.GenerateTestID() signer := warp.NewSigner(sk, networkID, chainID) - h := NewHandler(tt.attestor, signer, networkID, chainID) + h := NewHandler(tt.attestor, signer) clientNodeID := ids.GenerateTestNodeID() serverNodeID := ids.GenerateTestNodeID() c := p2ptest.NewClient( @@ -110,9 +110,9 @@ func TestHandler(t *testing.T) { // The zero value of testAttestor attests type testAttestor struct { CantAttest bool - Err error + Err *common.AppError } -func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) (bool, error) { +func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) (bool, *common.AppError) { return !t.CantAttest, t.Err } From c08a1d71d0245fd36508f2a1ece8824f1beddf49 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:55:40 -0400 Subject: [PATCH 06/16] undo Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/p2ptest/client.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/network/p2p/p2ptest/client.go b/network/p2p/p2ptest/client.go index 8d41d6b99bce..b75654028666 100644 --- a/network/p2p/p2ptest/client.go +++ b/network/p2p/p2ptest/client.go @@ -23,7 +23,7 @@ import ( // communicate with a server with the specified handler func NewClient( t *testing.T, - rootCtx context.Context, + ctx context.Context, handler p2p.Handler, clientNodeID ids.NodeID, serverNodeID ids.NodeID, @@ -38,6 +38,8 @@ func NewClient( require.NoError(t, err) clientSender.SendAppGossipF = func(ctx context.Context, _ common.SendConfig, gossipBytes []byte) error { + // Send the request asynchronously to avoid deadlock when the server + // sends the response back to the client go func() { require.NoError(t, serverNetwork.AppGossip(ctx, clientNodeID, gossipBytes)) }() @@ -56,6 +58,8 @@ func NewClient( } serverSender.SendAppResponseF = func(ctx context.Context, _ ids.NodeID, requestID uint32, responseBytes []byte) error { + // Send the request asynchronously to avoid deadlock when the server + // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppResponse(ctx, serverNodeID, requestID, responseBytes)) }() @@ -64,6 +68,8 @@ func NewClient( } serverSender.SendAppErrorF = func(ctx context.Context, _ ids.NodeID, requestID uint32, errorCode int32, errorMessage string) error { + // Send the request asynchronously to avoid deadlock when the server + // sends the response back to the client go func() { require.NoError(t, clientNetwork.AppRequestFailed(ctx, serverNodeID, requestID, &common.AppError{ Code: errorCode, @@ -74,10 +80,10 @@ func NewClient( return nil } - require.NoError(t, clientNetwork.Connected(rootCtx, clientNodeID, nil)) - require.NoError(t, clientNetwork.Connected(rootCtx, serverNodeID, nil)) - require.NoError(t, serverNetwork.Connected(rootCtx, clientNodeID, nil)) - require.NoError(t, serverNetwork.Connected(rootCtx, serverNodeID, nil)) + require.NoError(t, clientNetwork.Connected(ctx, clientNodeID, nil)) + require.NoError(t, clientNetwork.Connected(ctx, serverNodeID, nil)) + require.NoError(t, serverNetwork.Connected(ctx, clientNodeID, nil)) + require.NoError(t, serverNetwork.Connected(ctx, serverNodeID, nil)) require.NoError(t, serverNetwork.AddHandler(0, handler)) return clientNetwork.NewClient(0) From 9d07a45869d2a90bc54f3592ea6c178680f23ae8 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:57:31 -0400 Subject: [PATCH 07/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/network/acp118/handler.go b/network/acp118/handler.go index 1a9e210e55d8..1a0c43bb0bc6 100644 --- a/network/acp118/handler.go +++ b/network/acp118/handler.go @@ -25,10 +25,7 @@ type Attestor interface { } // NewHandler returns an instance of Handler -func NewHandler( - attestor Attestor, - signer warp.Signer, -) *Handler { +func NewHandler(attestor Attestor, signer warp.Signer) *Handler { return &Handler{ attestor: attestor, signer: signer, From f91e0a89611e137eb54d7da46e1a4478a7fe6877 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:02:48 -0400 Subject: [PATCH 08/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler.go | 11 +++-------- network/acp118/handler_test.go | 13 +++---------- network/p2p/error.go | 6 ------ 3 files changed, 6 insertions(+), 24 deletions(-) diff --git a/network/acp118/handler.go b/network/acp118/handler.go index 1a0c43bb0bc6..0fb4dd6185b6 100644 --- a/network/acp118/handler.go +++ b/network/acp118/handler.go @@ -21,7 +21,7 @@ var _ p2p.Handler = (*Handler)(nil) // Attestor defines whether to a warp message payload should be attested to type Attestor interface { - Attest(message *warp.UnsignedMessage, justification []byte) (bool, *common.AppError) + Attest(message *warp.UnsignedMessage, justification []byte) *common.AppError } // NewHandler returns an instance of Handler @@ -62,13 +62,8 @@ func (h *Handler) AppRequest( } } - ok, appErr := h.attestor.Attest(msg, request.Justification) - if appErr != nil { - return nil, appErr - } - - if !ok { - return nil, p2p.ErrAttestFailed + if err := h.attestor.Attest(msg, request.Justification); err != nil { + return nil, err } signature, err := h.signer.Sign(msg) diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go index c3a9a7de96ee..0897e74e39c7 100644 --- a/network/acp118/handler_test.go +++ b/network/acp118/handler_test.go @@ -11,7 +11,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/p2ptest" "github.com/ava-labs/avalanchego/proto/pb/sdk" "github.com/ava-labs/avalanchego/snow/engine/common" @@ -34,11 +33,6 @@ func TestHandler(t *testing.T) { attestor: &testAttestor{Err: &common.AppError{Code: int32(123)}}, expectedErr: &common.AppError{Code: int32(123)}, }, - { - name: "signature not attested", - attestor: &testAttestor{CantAttest: true}, - expectedErr: p2p.ErrAttestFailed, - }, { name: "signature attested", attestor: &testAttestor{}, @@ -109,10 +103,9 @@ func TestHandler(t *testing.T) { // The zero value of testAttestor attests type testAttestor struct { - CantAttest bool - Err *common.AppError + Err *common.AppError } -func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) (bool, *common.AppError) { - return !t.CantAttest, t.Err +func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) *common.AppError { + return t.Err } diff --git a/network/p2p/error.go b/network/p2p/error.go index 67b0317153e6..07207319a041 100644 --- a/network/p2p/error.go +++ b/network/p2p/error.go @@ -30,10 +30,4 @@ var ( Code: -4, Message: "throttled", } - // ErrAttestFailed should be used to indicate that a request failed - // to be signed due to the peer being unable to attest the message - ErrAttestFailed = &common.AppError{ - Code: -5, - Message: "failed attestation", - } ) From a35b09089f30c988ddceb47ecee87becfdae2ddd Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:08:02 -0400 Subject: [PATCH 09/16] add context Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler.go | 10 +++++++--- network/acp118/handler_test.go | 6 +++++- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/network/acp118/handler.go b/network/acp118/handler.go index 0fb4dd6185b6..1dc9ef59fb40 100644 --- a/network/acp118/handler.go +++ b/network/acp118/handler.go @@ -21,7 +21,11 @@ var _ p2p.Handler = (*Handler)(nil) // Attestor defines whether to a warp message payload should be attested to type Attestor interface { - Attest(message *warp.UnsignedMessage, justification []byte) *common.AppError + Attest( + ctx context.Context, + message *warp.UnsignedMessage, + justification []byte, + ) *common.AppError } // NewHandler returns an instance of Handler @@ -41,7 +45,7 @@ type Handler struct { } func (h *Handler) AppRequest( - _ context.Context, + ctx context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte, @@ -62,7 +66,7 @@ func (h *Handler) AppRequest( } } - if err := h.attestor.Attest(msg, request.Justification); err != nil { + if err := h.attestor.Attest(ctx, msg, request.Justification); err != nil { return nil, err } diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go index 0897e74e39c7..c5cc827bc1e3 100644 --- a/network/acp118/handler_test.go +++ b/network/acp118/handler_test.go @@ -106,6 +106,10 @@ type testAttestor struct { Err *common.AppError } -func (t testAttestor) Attest(*warp.UnsignedMessage, []byte) *common.AppError { +func (t testAttestor) Attest( + context.Context, + *warp.UnsignedMessage, + []byte, +) *common.AppError { return t.Err } From dd7029cd7564581ae7f2806ba9237a1fa4f714c1 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:08:18 -0400 Subject: [PATCH 10/16] fix Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go index c5cc827bc1e3..6a1f866851f6 100644 --- a/network/acp118/handler_test.go +++ b/network/acp118/handler_test.go @@ -95,7 +95,7 @@ func TestHandler(t *testing.T) { require.Equal(tt.expectedVerify, bls.Verify(pk, signature, request.Message)) } - require.NoError(c.AppRequest(ctx, set.Of(serverNodeID), requestBytes, onResponse)) + require.NoError(c.AppRequest(ctx, set.Of(clientNodeID), requestBytes, onResponse)) <-done }) } From 8978452302048f2513647c6455b797819b3d9e1c Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:11:25 -0400 Subject: [PATCH 11/16] rename attestor -> verifier Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/acp118/handler.go | 14 +++++++------- network/acp118/handler_test.go | 20 ++++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/network/acp118/handler.go b/network/acp118/handler.go index 1dc9ef59fb40..058c5af9dbfb 100644 --- a/network/acp118/handler.go +++ b/network/acp118/handler.go @@ -19,9 +19,9 @@ import ( var _ p2p.Handler = (*Handler)(nil) -// Attestor defines whether to a warp message payload should be attested to -type Attestor interface { - Attest( +// Verifier defines whether a warp message payload should be verified +type Verifier interface { + Verify( ctx context.Context, message *warp.UnsignedMessage, justification []byte, @@ -29,9 +29,9 @@ type Attestor interface { } // NewHandler returns an instance of Handler -func NewHandler(attestor Attestor, signer warp.Signer) *Handler { +func NewHandler(verifier Verifier, signer warp.Signer) *Handler { return &Handler{ - attestor: attestor, + verifier: verifier, signer: signer, } } @@ -40,7 +40,7 @@ func NewHandler(attestor Attestor, signer warp.Signer) *Handler { type Handler struct { p2p.NoOpHandler - attestor Attestor + verifier Verifier signer warp.Signer } @@ -66,7 +66,7 @@ func (h *Handler) AppRequest( } } - if err := h.attestor.Attest(ctx, msg, request.Justification); err != nil { + if err := h.verifier.Verify(ctx, msg, request.Justification); err != nil { return nil, err } diff --git a/network/acp118/handler_test.go b/network/acp118/handler_test.go index 6a1f866851f6..bdf76f3b942c 100644 --- a/network/acp118/handler_test.go +++ b/network/acp118/handler_test.go @@ -19,23 +19,23 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/warp" ) -var _ Attestor = (*testAttestor)(nil) +var _ Verifier = (*testVerifier)(nil) func TestHandler(t *testing.T) { tests := []struct { name string - attestor Attestor + verifier Verifier expectedErr error expectedVerify bool }{ { - name: "signature fails attestation", - attestor: &testAttestor{Err: &common.AppError{Code: int32(123)}}, + name: "signature fails verification", + verifier: &testVerifier{Err: &common.AppError{Code: int32(123)}}, expectedErr: &common.AppError{Code: int32(123)}, }, { - name: "signature attested", - attestor: &testAttestor{}, + name: "signature signed", + verifier: &testVerifier{}, expectedVerify: true, }, } @@ -51,7 +51,7 @@ func TestHandler(t *testing.T) { networkID := uint32(123) chainID := ids.GenerateTestID() signer := warp.NewSigner(sk, networkID, chainID) - h := NewHandler(tt.attestor, signer) + h := NewHandler(tt.verifier, signer) clientNodeID := ids.GenerateTestNodeID() serverNodeID := ids.GenerateTestNodeID() c := p2ptest.NewClient( @@ -101,12 +101,12 @@ func TestHandler(t *testing.T) { } } -// The zero value of testAttestor attests -type testAttestor struct { +// The zero value of testVerifier verifies +type testVerifier struct { Err *common.AppError } -func (t testAttestor) Attest( +func (t testVerifier) Verify( context.Context, *warp.UnsignedMessage, []byte, From a3832095128c0fe69e64da579f1e91b668e5a2e7 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:21:23 -0400 Subject: [PATCH 12/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/{ => p2p}/acp118/handler.go | 0 network/{ => p2p}/acp118/handler_test.go | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename network/{ => p2p}/acp118/handler.go (100%) rename network/{ => p2p}/acp118/handler_test.go (100%) diff --git a/network/acp118/handler.go b/network/p2p/acp118/handler.go similarity index 100% rename from network/acp118/handler.go rename to network/p2p/acp118/handler.go diff --git a/network/acp118/handler_test.go b/network/p2p/acp118/handler_test.go similarity index 100% rename from network/acp118/handler_test.go rename to network/p2p/acp118/handler_test.go From 8b52eadf9434ad162f2661702072c26e604bc0f9 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:25:35 -0400 Subject: [PATCH 13/16] nit Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/acp118/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/acp118/handler.go b/network/p2p/acp118/handler.go index 058c5af9dbfb..8fb39b8a5513 100644 --- a/network/p2p/acp118/handler.go +++ b/network/p2p/acp118/handler.go @@ -19,7 +19,7 @@ import ( var _ p2p.Handler = (*Handler)(nil) -// Verifier defines whether a warp message payload should be verified +// Verifier verifies that a warp message should be signed type Verifier interface { Verify( ctx context.Context, From 34a791114ee64112fb52ecab18761c2cd986fe43 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:56:59 -0400 Subject: [PATCH 14/16] Update network/p2p/acp118/handler.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/acp118/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/acp118/handler.go b/network/p2p/acp118/handler.go index 8fb39b8a5513..ebceefb1fb85 100644 --- a/network/p2p/acp118/handler.go +++ b/network/p2p/acp118/handler.go @@ -62,7 +62,7 @@ func (h *Handler) AppRequest( if err != nil { return nil, &common.AppError{ Code: p2p.ErrUnexpected.Code, - Message: fmt.Sprintf("failed to initialize warp unsigned message: %s", err), + Message: fmt.Sprintf("failed to parse warp unsigned message: %s", err), } } From 03f003fa72351bca02ec5b19f5a9658c7a72fea2 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:57:04 -0400 Subject: [PATCH 15/16] Update network/p2p/acp118/handler_test.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/acp118/handler_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/p2p/acp118/handler_test.go b/network/p2p/acp118/handler_test.go index bdf76f3b942c..fcd5114b4d5c 100644 --- a/network/p2p/acp118/handler_test.go +++ b/network/p2p/acp118/handler_test.go @@ -30,8 +30,8 @@ func TestHandler(t *testing.T) { }{ { name: "signature fails verification", - verifier: &testVerifier{Err: &common.AppError{Code: int32(123)}}, - expectedErr: &common.AppError{Code: int32(123)}, + verifier: &testVerifier{Err: &common.AppError{Code: 123}}, + expectedErr: &common.AppError{Code: 123}, }, { name: "signature signed", From e6b71db074b415d1e290f58c0bb5a93265637712 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:57:09 -0400 Subject: [PATCH 16/16] Update network/p2p/acp118/handler_test.go Co-authored-by: Stephen Buttolph Signed-off-by: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> --- network/p2p/acp118/handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/p2p/acp118/handler_test.go b/network/p2p/acp118/handler_test.go index fcd5114b4d5c..4ca7add318da 100644 --- a/network/p2p/acp118/handler_test.go +++ b/network/p2p/acp118/handler_test.go @@ -101,7 +101,7 @@ func TestHandler(t *testing.T) { } } -// The zero value of testVerifier verifies +// The zero value of testVerifier allows signing type testVerifier struct { Err *common.AppError }