Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ACP-118 Aggregator #3394

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
7c6d0a7
implement acp-118 signature aggregation
joshua-kim Jun 11, 2024
11ada0f
undo
joshua-kim Oct 22, 2024
e650f5d
nit
joshua-kim Oct 22, 2024
81c9487
nit
joshua-kim Nov 12, 2024
0760fcd
improve usage of context
joshua-kim Nov 12, 2024
b6cfd0e
improve doc
joshua-kim Nov 12, 2024
dacd131
nit
joshua-kim Nov 12, 2024
810fb55
rename i -> index
joshua-kim Nov 13, 2024
6f6c0b5
nit
joshua-kim Nov 14, 2024
9c2e9eb
nit
joshua-kim Nov 18, 2024
6589763
nit
joshua-kim Nov 18, 2024
c75e929
int
joshua-kim Nov 19, 2024
ce284e6
nit
joshua-kim Nov 19, 2024
f7e2ca9
nit
joshua-kim Nov 19, 2024
ef9e7ec
nit
joshua-kim Nov 19, 2024
a8642a5
nit
joshua-kim Nov 19, 2024
aa5da47
nit
joshua-kim Nov 19, 2024
a44b653
nit
joshua-kim Nov 19, 2024
26bb1a0
nit
joshua-kim Nov 19, 2024
d2139cc
nit
joshua-kim Nov 19, 2024
4868a99
nit
joshua-kim Nov 19, 2024
c5aaebb
nit
joshua-kim Nov 19, 2024
fecd189
nit
joshua-kim Nov 19, 2024
3b23a22
nit
joshua-kim Nov 19, 2024
420253d
nit
joshua-kim Nov 19, 2024
4c4c380
nit
joshua-kim Nov 19, 2024
738b0ce
nit
joshua-kim Nov 19, 2024
455e03e
nit
joshua-kim Nov 20, 2024
7d7c3c0
nit
joshua-kim Nov 20, 2024
f989cf5
nit
joshua-kim Nov 20, 2024
900279d
nit
joshua-kim Nov 20, 2024
de7b65c
nit
joshua-kim Nov 20, 2024
7c4fa96
nit
joshua-kim Nov 20, 2024
9802ab2
fix tests
joshua-kim Nov 20, 2024
8b6563a
nit
joshua-kim Nov 20, 2024
4480b48
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
67f533e
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
2a9621b
Update network/p2p/acp118/aggregator.go
joshua-kim Nov 21, 2024
3773575
move nil signatures check
joshua-kim Nov 21, 2024
356c36d
nit
joshua-kim Nov 26, 2024
c279876
nti
joshua-kim Nov 26, 2024
7bfe878
wip
joshua-kim Nov 21, 2024
cd716bc
nit
joshua-kim Nov 27, 2024
7752928
nit
joshua-kim Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 266 additions & 0 deletions network/p2p/acp118/aggregator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package acp118

import (
"context"
"errors"
"fmt"
"math/big"

"go.uber.org/zap"
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/proto/pb/sdk"
"github.com/ava-labs/avalanchego/utils/crypto/bls"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/platformvm/warp"
)

var errFailedVerification = errors.New("failed verification")

type indexedValidator struct {
*warp.Validator
Index int
}

type result struct {
NodeID ids.NodeID
Validator indexedValidator
Signature *bls.Signature
Err error
ShouldClose bool
}

// NewSignatureAggregator returns an instance of SignatureAggregator
func NewSignatureAggregator(log logging.Logger, client *p2p.Client) *SignatureAggregator {
return &SignatureAggregator{
log: log,
client: client,
}
}

// SignatureAggregator aggregates validator signatures for warp messages
type SignatureAggregator struct {
log logging.Logger
client *p2p.Client
}

// AggregateSignatures blocks until quorumNum/quorumDen signatures from
// validators are requested to be aggregated into a warp message or the context
// is canceled. Returns the signed message and the amount of stake that signed
// the message. Caller is responsible for providing a well-formed canonical
// validator set corresponding to the signer bitset in the message.
func (s *SignatureAggregator) AggregateSignatures(
ctx context.Context,
message *warp.Message,
justification []byte,
validators []*warp.Validator,
quorumNum uint64,
quorumDen uint64,
) (
_ *warp.Message,
aggregatedStake *big.Int,
totalStake *big.Int,
finished bool,
_ error,
) {
request := &sdk.SignatureRequest{
Message: message.UnsignedMessage.Bytes(),
Justification: justification,
}

requestBytes, err := proto.Marshal(request)
if err != nil {
return nil, nil, nil, false, fmt.Errorf("failed to marshal signature request: %w", err)
}

publicKeysToValidators := make(map[*bls.PublicKey]indexedValidator)
nodeIDsToPublicKeys := make(map[ids.NodeID]*bls.PublicKey)
// TODO expose concrete type to avoid type casting
bitSetSignature, ok := message.Signature.(*warp.BitSetSignature)
if !ok {
return nil, nil, nil, false, errors.New("invalid warp signature type")
}

signerBitSet := set.BitsFromBytes(bitSetSignature.Signers)

nonSigners := make([]*bls.PublicKey, 0, len(validators))
aggregatedStakeWeight := new(big.Int)
totalStakeWeight := new(big.Int)
numRequests := 0
for i, v := range validators {
totalStakeWeight.Add(totalStakeWeight, new(big.Int).SetUint64(v.Weight))

// Only try to aggregate signatures from validators that are not already in
// the signer bit set
if signerBitSet.Contains(i) {
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(v.Weight))
continue
}

publicKeysToValidators[v.PublicKey] = indexedValidator{
Index: i,
Validator: v,
}

for _, nodeID := range v.NodeIDs {
numRequests += 1
nodeIDsToPublicKeys[nodeID] = v.PublicKey
}

nonSigners = append(nonSigners, v.PublicKey)
}

signatures := make([]*bls.Signature, 0, len(nonSigners)+1)
if bitSetSignature.Signature != [bls.SignatureLen]byte{} {
blsSignature, err := bls.SignatureFromBytes(bitSetSignature.Signature[:])
if err != nil {
return nil, nil, nil, false, fmt.Errorf("failed to parse bls signature: %w", err)
}
signatures = append(signatures, blsSignature)
}

// TODO something better than a buffered channel?
results := make(chan result, numRequests)
handler := responseHandler{
message: message,
publicKeysToValidators: publicKeysToValidators,
nodeIDsToPublicKeys: nodeIDsToPublicKeys,
results: results,
}

for _, pk := range nonSigners {
validator := publicKeysToValidators[pk]
for _, nodeID := range validator.NodeIDs {
if err := s.client.AppRequest(ctx, set.Of(nodeID), requestBytes, handler.HandleResponse); err != nil {
results <- result{NodeID: nodeID, Validator: validator, Err: err, ShouldClose: true}
}
}
}

minThreshold := new(big.Int).Mul(totalStakeWeight, new(big.Int).SetUint64(quorumNum))
minThreshold.Div(minThreshold, new(big.Int).SetUint64(quorumDen))

for i := 0; i < numRequests; i++ {
select {
case <-ctx.Done():
// Try to return whatever progress we have if the context is cancelled
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, false, nil
case result := <-results:
if result.Err != nil {
s.log.Debug(
"dropping response",
zap.Stringer("nodeID", result.NodeID),
zap.Error(err),
)

if !result.ShouldClose {
continue
}

return nil, nil, nil, false, result.Err
}

if signerBitSet.Contains(result.Validator.Index) {
s.log.Debug(
"dropping duplicate signature",
zap.Stringer("nodeID", result.NodeID),
zap.Error(err),
)
continue
}

signatures = append(signatures, result.Signature)
signerBitSet.Add(result.Validator.Index)
aggregatedStakeWeight.Add(aggregatedStakeWeight, new(big.Int).SetUint64(result.Validator.Weight))

if aggregatedStakeWeight.Cmp(minThreshold) != -1 {
msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, true, nil
}
}
}

msg, err := newWarpMessage(message, signerBitSet, signatures)
if err != nil {
return nil, nil, nil, false, err
}

return msg, aggregatedStakeWeight, totalStakeWeight, true, nil
}

func newWarpMessage(
message *warp.Message,
signerBitSet set.Bits,
signatures []*bls.Signature,
) (*warp.Message, error) {
if len(signatures) == 0 {
return message, nil
}

aggregateSignature, err := bls.AggregateSignatures(signatures)
if err != nil {
return nil, err
}

bitSetSignature := &warp.BitSetSignature{
Signers: signerBitSet.Bytes(),
Signature: [bls.SignatureLen]byte{},
}
copy(bitSetSignature.Signature[:], bls.SignatureToBytes(aggregateSignature))

return warp.NewMessage(&message.UnsignedMessage, bitSetSignature)
}

type responseHandler struct {
message *warp.Message
publicKeysToValidators map[*bls.PublicKey]indexedValidator
nodeIDsToPublicKeys map[ids.NodeID]*bls.PublicKey
results chan result
}

func (r *responseHandler) HandleResponse(
_ context.Context,
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
validator := r.publicKeysToValidators[r.nodeIDsToPublicKeys[nodeID]]
if err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

response := &sdk.SignatureResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

signature, err := bls.SignatureFromBytes(response.Signature)
if err != nil {
r.results <- result{NodeID: nodeID, Validator: validator, Err: err}
return
}

if !bls.Verify(validator.PublicKey, signature, r.message.UnsignedMessage.Bytes()) {
r.results <- result{NodeID: nodeID, Validator: validator, Err: errFailedVerification}
return
}

r.results <- result{NodeID: nodeID, Validator: validator, Signature: signature}
}
Loading
Loading