Skip to content

Commit

Permalink
Fix remaining length parse (#63)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Dec 29, 2019
1 parent b40f475 commit bd125da
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 2 deletions.
4 changes: 2 additions & 2 deletions serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ func (c *BaseClient) serve() error {
pktType := packetType(pktTypeBytes[0] & 0xF0)
pktFlag := pktTypeBytes[0] & 0x0F
var remainingLength int
for {
for shift := uint(0); ; shift += 7 {
b := make([]byte, 1)
if _, err := io.ReadFull(r, b); err != nil {
return err
}
remainingLength = (remainingLength << 7) | (int(b[0]) & 0x7F)
remainingLength |= (int(b[0]) & 0x7F) << shift
if !(b[0]&0x80 != 0) {
break
}
Expand Down
81 changes: 81 additions & 0 deletions serve_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2019 The mqtt-go authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package mqtt

import (
"context"
"net"
"testing"
"time"
)

func TestRemainingLengthParse(t *testing.T) {
ca, cb := net.Pipe()
cli := &BaseClient{Transport: cb}

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
connected := make(chan struct{})
go func() {
if _, err := cli.Connect(ctx, "cli"); err != nil {
if ctx.Err() == nil {
t.Fatalf("Unexpected error: '%v'", err)
}
}
if cli.connState.String() != "Active" {
t.Errorf("State after Connect must be 'Active', but is '%s'", cli.connState)
}
close(connected)
}()

b := make([]byte, 100)
if _, err := ca.Read(b); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}

// Send CONNACK.
if _, err := ca.Write([]byte{
0x20, 0x02, 0x00, 0x00,
}); err != nil {
t.Fatalf("Unexpected error: '%v'", err)
}

select {
case <-connected:
case <-ctx.Done():
t.Fatal("Timeout")
}

chMsg := make(chan *Message)
cli.Handle(HandlerFunc(func(msg *Message) {
chMsg <- msg
}))

// Send PUBLISH from broker.
if _, err := ca.Write(
(&pktPublish{Message: &Message{Topic: "a", Payload: make([]byte, 256)}}).pack(),
); err != nil {
t.Fatalf("Unexpected error: ''%v''", err)
}

select {
case msg := <-chMsg:
if len(msg.Payload) != 256 {
t.Errorf("Expected message payload size: 256, got: %d", len(msg.Payload))
}
case <-ctx.Done():
t.Error("Timeout")
}
}

0 comments on commit bd125da

Please sign in to comment.