Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: remove ws net proto 2.1 #6081

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 10 additions & 45 deletions network/msgCompressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,6 @@

const zstdCompressionLevel = zstd.BestSpeed

// checkCanCompress checks if there is an proposal payload message and peers supporting compression
func checkCanCompress(request broadcastRequest, peers []*wsPeer) bool {
canCompress := false
hasPP := false
for _, tag := range request.tags {
if tag == protocol.ProposalPayloadTag {
hasPP = true
break
}
}
// if have proposal payload check if there are any peers supporting compression
if hasPP {
for _, peer := range peers {
if peer.pfProposalCompressionSupported() {
canCompress = true
break
}
}
}
return canCompress
}

// zstdCompressMsg returns a concatenation of a tag and compressed data
func zstdCompressMsg(tbytes []byte, d []byte) ([]byte, string) {
bound := zstd.CompressBound(len(d))
Expand Down Expand Up @@ -89,13 +67,7 @@
ppdec zstdProposalDecompressor
}

type zstdProposalDecompressor struct {
active bool
}

func (dec zstdProposalDecompressor) enabled() bool {
return dec.active
}
type zstdProposalDecompressor struct{}

func (dec zstdProposalDecompressor) accept(data []byte) bool {
return len(data) > 4 && bytes.Equal(data[:4], zstdCompressionMagic[:])
Expand Down Expand Up @@ -126,18 +98,16 @@

func (c *wsPeerMsgDataConverter) convert(tag protocol.Tag, data []byte) ([]byte, error) {
if tag == protocol.ProposalPayloadTag {
if c.ppdec.enabled() {
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
return nil, fmt.Errorf("peer %s: %w", c.origin, err)
}
return res, nil
// sender might support compressed payload but fail to compress for whatever reason,
// in this case it sends non-compressed payload - the receiver decompress only if it is compressed.
if c.ppdec.accept(data) {
res, err := c.ppdec.convert(data)
if err != nil {
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("peer %s: %w", c.origin, err)

Check warning on line 106 in network/msgCompressor.go

View check run for this annotation

Codecov / codecov/patch

network/msgCompressor.go#L106

Added line #L106 was not covered by tests
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
return res, nil
}
c.log.Warnf("peer %s supported zstd but sent non-compressed data", c.origin)
}
return data, nil
}
Expand All @@ -148,11 +118,6 @@
origin: wp.originAddress,
}

if wp.pfProposalCompressionSupported() {
c.ppdec = zstdProposalDecompressor{
active: true,
}
}

c.ppdec = zstdProposalDecompressor{}
return &c
}
39 changes: 2 additions & 37 deletions network/msgCompressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,6 @@ func TestZstdDecompress(t *testing.T) {
require.Nil(t, decompressed)
}

func TestCheckCanCompress(t *testing.T) {
partitiontest.PartitionTest(t)

req := broadcastRequest{}
peers := []*wsPeer{}
r := checkCanCompress(req, peers)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag}
r = checkCanCompress(req, peers)
require.False(t, r)

req.tags = []protocol.Tag{protocol.AgreementVoteTag, protocol.ProposalPayloadTag}
r = checkCanCompress(req, peers)
require.False(t, r)

peer1 := wsPeer{
features: 0,
}
peers = []*wsPeer{&peer1}
r = checkCanCompress(req, peers)
require.False(t, r)

peer2 := wsPeer{
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
r = checkCanCompress(req, peers)
require.True(t, r)
}

func TestZstdCompressMsg(t *testing.T) {
partitiontest.PartitionTest(t)

Expand Down Expand Up @@ -108,7 +77,7 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
partitiontest.PartitionTest(t)

c := wsPeerMsgDataConverter{}
c.ppdec = zstdProposalDecompressor{active: false}
c.ppdec = zstdProposalDecompressor{}
tag := protocol.AgreementVoteTag
data := []byte("data")

Expand All @@ -117,13 +86,9 @@ func TestWsPeerMsgDataConverterConvert(t *testing.T) {
require.Equal(t, data, r)

tag = protocol.ProposalPayloadTag
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)

l := converterTestLogger{}
c.log = &l
c.ppdec = zstdProposalDecompressor{active: true}
c.ppdec = zstdProposalDecompressor{}
r, err = c.convert(tag, data)
require.NoError(t, err)
require.Equal(t, data, r)
Expand Down
69 changes: 12 additions & 57 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})

var networkPrioBatchesPPWithCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_batches_wpp_comp_sent_total", Description: "number of prio compressed batches with PP"})
var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_pp_prio_batches_wpp_non_comp_sent_total", Description: "number of prio non-compressed batches with PP"})
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"})
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"})

// peerDisconnectionAckDuration defines the time we would wait for the peer disconnection to complete.
const peerDisconnectionAckDuration = 5 * time.Second

Expand Down Expand Up @@ -1062,6 +1057,7 @@
wn.setHeaders(responseHeader)
responseHeader.Set(ProtocolVersionHeader, matchingVersion)
responseHeader.Set(GenesisHeader, wn.GenesisID)
// set the features we support
responseHeader.Set(PeerFeaturesHeader, PeerFeatureProposalCompression)
var challenge string
if wn.prioScheme != nil {
Expand Down Expand Up @@ -1391,21 +1387,10 @@
}

// preparePeerData prepares batches of data for sending.
// It performs optional zstd compression for proposal massages
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool, peers []*wsPeer) ([][]byte, [][]byte, []crypto.Digest, bool) {
// determine if there is a payload proposal and peers supporting compressed payloads
wantCompression := false
containsPrioPPTag := false
if prio {
wantCompression = checkCanCompress(request, peers)
}

// It performs zstd compression for proposal massages if they this is a prio request and has proposal.
func (wn *msgBroadcaster) preparePeerData(request broadcastRequest, prio bool) ([][]byte, []crypto.Digest) {
digests := make([]crypto.Digest, len(request.data))
data := make([][]byte, len(request.data))
var dataCompressed [][]byte
if wantCompression {
dataCompressed = make([][]byte, len(request.data))
}
for i, d := range request.data {
tbytes := []byte(request.tags[i])
mbytes := make([]byte, len(tbytes)+len(d))
Expand All @@ -1416,29 +1401,15 @@
digests[i] = crypto.Hash(mbytes)
}

if prio {
if request.tags[i] == protocol.ProposalPayloadTag {
networkPrioPPNonCompressedSize.AddUint64(uint64(len(d)), nil)
containsPrioPPTag = true
}
}

if wantCompression {
if request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)
} else {
networkPrioPPCompressedSize.AddUint64(uint64(len(compressed)), nil)
}
dataCompressed[i] = compressed
} else {
// otherwise reuse non-compressed from above
dataCompressed[i] = mbytes
if prio && request.tags[i] == protocol.ProposalPayloadTag {
compressed, logMsg := zstdCompressMsg(tbytes, d)
if len(logMsg) > 0 {
wn.log.Warn(logMsg)

Check warning on line 1407 in network/wsNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/wsNetwork.go#L1407

Added line #L1407 was not covered by tests
}
data[i] = compressed
}
}
return data, dataCompressed, digests, containsPrioPPTag
return data, digests
}

// prio is set if the broadcast is a high-priority broadcast.
Expand All @@ -1455,7 +1426,7 @@
}

start := time.Now()
data, dataWithCompression, digests, containsPrioPPTag := wn.preparePeerData(request, prio, peers)
data, digests := wn.preparePeerData(request, prio)

// first send to all the easy outbound peers who don't block, get them started.
sentMessageCount := 0
Expand All @@ -1466,23 +1437,7 @@
if peer == request.except {
continue
}
var ok bool
if peer.pfProposalCompressionSupported() && len(dataWithCompression) > 0 {
// if this peer supports compressed proposals and compressed data batch is filled out, use it
ok = peer.writeNonBlockMsgs(request.ctx, dataWithCompression, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithCompression.Inc(nil)
}
}
} else {
ok = peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if prio {
if containsPrioPPTag {
networkPrioBatchesPPWithoutCompression.Inc(nil)
}
}
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
if ok {
sentMessageCount++
continue
Expand Down Expand Up @@ -1951,7 +1906,7 @@
const ProtocolAcceptVersionHeader = "X-Algorand-Accept-Version"

// SupportedProtocolVersions contains the list of supported protocol versions by this node ( in order of preference ).
var SupportedProtocolVersions = []string{"2.2", "2.1"}
var SupportedProtocolVersions = []string{"2.2"}
gmalouf marked this conversation as resolved.
Show resolved Hide resolved

// ProtocolVersion is the current version attached to the ProtocolVersionHeader header
/* Version history:
Expand Down
46 changes: 12 additions & 34 deletions network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,15 +429,12 @@ func TestWebsocketProposalPayloadCompression(t *testing.T) {
}

var tests []testDef = []testDef{
// two old nodes
{[]string{"2.1"}, "2.1", []string{"2.1"}, "2.1"},

// two new nodes with overwritten config
{[]string{"2.2"}, "2.2", []string{"2.2"}, "2.2"},

// old node + new node
{[]string{"2.1"}, "2.1", []string{"2.2", "2.1"}, "2.2"},
{[]string{"2.2", "2.1"}, "2.2", []string{"2.1"}, "2.1"},
{[]string{"2.2", "2.1"}, "2.1", []string{"2.2"}, "2.2"},
gmalouf marked this conversation as resolved.
Show resolved Hide resolved

// combinations
{[]string{"2.2", "2.1"}, "2.1", []string{"2.2", "2.1"}, "2.1"},
Expand Down Expand Up @@ -1101,7 +1098,7 @@ func TestDupFilter(t *testing.T) {
defer netC.Stop()

makeMsg := func(n int) []byte {
// We cannot harcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that.
// We cannot hardcode the msgSize to messageFilterSize + 1 because max allowed AV message is smaller than that.
// We also cannot use maxSize for PP since it's a compressible tag but trying to compress random data will expand it.
if messageFilterSize+1 < n {
n = messageFilterSize + 1
Expand Down Expand Up @@ -1387,7 +1384,7 @@ func TestPeeringWithIdentityChallenge(t *testing.T) {
assert.Equal(t, 0, len(netB.GetPeers(PeersConnectedOut)))
// netA never attempts to set identity as it never sees a verified identity
assert.Equal(t, 1, netA.identityTracker.(*mockIdentityTracker).getSetCount())
// no connecton => netB does attepmt to add the identity to the tracker
// no connection => netB does attempt to add the identity to the tracker
// and it would not end up being added
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getSetCount())
assert.Equal(t, 1, netB.identityTracker.(*mockIdentityTracker).getInsertCount())
Expand Down Expand Up @@ -1608,7 +1605,7 @@ func TestPeeringReceiverIdentityChallengeOnly(t *testing.T) {
assert.Equal(t, 0, netB.identityTracker.(*mockIdentityTracker).getSetCount())
}

// TestPeeringIncorrectDeduplicationName confirm that if the reciever can't match
// TestPeeringIncorrectDeduplicationName confirm that if the receiver can't match
// the Address in the challenge to its PublicAddress, identities aren't exchanged, but peering continues
func TestPeeringIncorrectDeduplicationName(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down Expand Up @@ -1665,7 +1662,7 @@ func TestPeeringIncorrectDeduplicationName(t *testing.T) {

// bi-directional connection would now work since netB detects to be connected to netA in tryConnectReserveAddr,
// so force it.
// this second connection should set identities, because the reciever address matches now
// this second connection should set identities, because the receiver address matches now
_, ok = netB.tryConnectReserveAddr(addrA)
assert.False(t, ok)
netB.wg.Add(1)
Expand Down Expand Up @@ -2504,9 +2501,9 @@ func TestWebsocketNetwork_checkServerResponseVariables(t *testing.T) {
}

func (wn *WebsocketNetwork) broadcastWithTimestamp(tag protocol.Tag, data []byte, when time.Time) error {
msgArr := make([][]byte, 1, 1)
msgArr := make([][]byte, 1)
msgArr[0] = data
tagArr := make([]protocol.Tag, 1, 1)
tagArr := make([]protocol.Tag, 1)
tagArr[0] = tag
request := broadcastRequest{tags: tagArr, data: msgArr, enqueueTime: when, ctx: context.Background()}

Expand Down Expand Up @@ -3711,48 +3708,29 @@ func TestPreparePeerData(t *testing.T) {
data: [][]byte{[]byte("test"), []byte("data")},
}

peers := []*wsPeer{}
wn := WebsocketNetwork{}
data, comp, digests, seenPrioPPTag := wn.broadcaster.preparePeerData(req, false, peers)
data, digests := wn.broadcaster.preparePeerData(req, false)
require.NotEmpty(t, data)
require.Empty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.False(t, seenPrioPPTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
}

// compression
peer1 := wsPeer{
features: 0,
}
peer2 := wsPeer{
features: pfCompressedProposal,
}
peers = []*wsPeer{&peer1, &peer2}
data, comp, digests, seenPrioPPTag = wn.broadcaster.preparePeerData(req, true, peers)
data, digests = wn.broadcaster.preparePeerData(req, true)
require.NotEmpty(t, data)
require.NotEmpty(t, comp)
require.NotEmpty(t, digests)
require.Equal(t, len(req.data), len(digests))
require.Equal(t, len(data), len(digests))
require.Equal(t, len(comp), len(digests))
require.True(t, seenPrioPPTag)

for i := range data {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
}

for i := range comp {
if req.tags[i] != protocol.ProposalPayloadTag {
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), comp[i])
require.Equal(t, data[i], comp[i])
require.Equal(t, append([]byte(req.tags[i]), req.data[i]...), data[i])
require.Equal(t, data[i], data[i])
} else {
require.NotEqual(t, data[i], comp[i])
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), comp[i][:len(req.tags[i])+len(zstdCompressionMagic)])
require.Equal(t, append([]byte(req.tags[i]), zstdCompressionMagic[:]...), data[i][:len(req.tags[i])+len(zstdCompressionMagic)])
}
}
}
Expand Down
Loading
Loading