Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gbn+mailbox: cleanup & prefixed logger #90

Merged
merged 4 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions gbn/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package gbn

import "time"

// config holds the configuration values for an instance of GoBackNConn.
type config struct {
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
// n is the window size. The sender can send a maximum of n packets
// before requiring an ack from the receiver for the first packet in
// the window. The value of n is chosen by the client during the
// GoBN handshake.
n uint8

// s is the maximum sequence number used to label packets. Packets
// are labelled with incrementing sequence numbers modulo s.
// s must be strictly larger than the window size, n. This
// is so that the receiver can tell if the sender is resending the
// previous window (maybe the sender did not receive the acks) or if
// they are sending the next window. If s <= n then there would be
// no way to tell.
s uint8

// maxChunkSize is the maximum payload size in bytes allowed per
// message. If the payload to be sent is larger than maxChunkSize then
// the payload will be split between multiple packets.
// If maxChunkSize is zero then it is disabled and data won't be split
// between packets.
maxChunkSize int

// resendTimeout is the duration that will be waited before resending
// the packets in the current queue.
resendTimeout time.Duration

// recvFromStream is the function that will be used to acquire the next
// available packet.
recvFromStream recvBytesFunc

// sendToStream is the function that will be used to send over our next
// packet.
sendToStream sendBytesFunc

// handshakeTimeout is the time after which the server or client
// will abort and restart the handshake if the expected response is
// not received from the peer.
handshakeTimeout time.Duration

pingTime time.Duration
pongTime time.Duration
}

// newConfig constructs a new config struct.
func newConfig(sendFunc sendBytesFunc, recvFunc recvBytesFunc,
n uint8) *config {

return &config{
n: n,
s: n + 1,
recvFromStream: recvFunc,
sendToStream: sendFunc,
resendTimeout: defaultResendTimeout,
handshakeTimeout: defaultHandshakeTimeout,
}
}
38 changes: 22 additions & 16 deletions gbn/gbn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ func NewClientConn(ctx context.Context, n uint8, sendFunc sendBytesFunc,
math.MaxUint8)
}

conn := newGoBackNConn(ctx, sendFunc, receiveFunc, false, n)
cfg := newConfig(sendFunc, receiveFunc, n)

// Apply functional options
for _, o := range opts {
o(conn)
o(cfg)
}

conn := newGoBackNConn(ctx, cfg, "client")

if err := conn.clientHandshake(); err != nil {
if err := conn.Close(); err != nil {
log.Errorf("error closing gbn ClientConn: %v", err)
conn.log.Errorf("error closing gbn ClientConn: %v", err)
}
return nil, err
}
Expand Down Expand Up @@ -76,7 +78,7 @@ func (g *GoBackNConn) clientHandshake() error {
case <-recvNext:
}

b, err := g.recvFromStream(g.ctx)
b, err := g.cfg.recvFromStream(g.ctx)
if err != nil {
errChan <- err
return
Expand All @@ -101,21 +103,22 @@ func (g *GoBackNConn) clientHandshake() error {
handshake:
for {
// start Handshake
msg := &PacketSYN{N: g.n}
msg := &PacketSYN{N: g.cfg.n}
msgBytes, err := msg.Serialize()
if err != nil {
return err
}

// Send SYN
log.Debugf("Client sending SYN")
if err := g.sendToStream(g.ctx, msgBytes); err != nil {
g.log.Debugf("Sending SYN")
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
if err := g.cfg.sendToStream(g.ctx, msgBytes); err != nil {
return err
}

for {
// Wait for SYN
log.Debugf("Client waiting for SYN")
g.log.Debugf("Waiting for SYN")

select {
case recvNext <- 1:
case <-g.quit:
Expand All @@ -127,8 +130,10 @@ handshake:

var b []byte
select {
case <-time.After(g.handshakeTimeout):
log.Debugf("SYN resendTimeout. Resending SYN.")
case <-time.After(g.cfg.handshakeTimeout):
g.log.Debugf("SYN resendTimeout. Resending " +
"SYN.")

continue handshake
case <-g.quit:
return nil
Expand All @@ -144,7 +149,8 @@ handshake:
return err
}

log.Debugf("Client got %T", resp)
g.log.Debugf("Got %T", resp)

switch r := resp.(type) {
case *PacketSYN:
respSYN = r
Expand All @@ -159,24 +165,24 @@ handshake:
}
}

log.Debugf("Client got SYN")
g.log.Debugf("Got SYN")

if respSYN.N != g.n {
if respSYN.N != g.cfg.n {
return io.EOF
}

// Send SYNACK
log.Debugf("Client sending SYNACK")
g.log.Debugf("Sending SYNACK")
synack, err := new(PacketSYNACK).Serialize()
if err != nil {
return err
}

if err := g.sendToStream(g.ctx, synack); err != nil {
if err := g.cfg.sendToStream(g.ctx, synack); err != nil {
return err
}

log.Debugf("Client Handshake complete")
g.log.Debugf("Handshake complete")

return nil
}
Loading
Loading