Skip to content

Commit

Permalink
Internal sendPacket reimplemented using io.Reader. Data restoration f…
Browse files Browse the repository at this point in the history
…unction removed.

Handler's OnReceive use io.Reader to pass received data.
Tests updated. Mocks regenerated.
  • Loading branch information
alexeykiselev committed Dec 27, 2024
1 parent 16a32cb commit 63ade4e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 59 deletions.
4 changes: 3 additions & 1 deletion pkg/networking/handler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package networking

import "io"

// Handler is an interface for handling new messages, handshakes and session close events.
type Handler interface {
// OnReceive fired on new message received.
OnReceive(*Session, []byte)
OnReceive(*Session, io.Reader)

// OnHandshake fired on new Handshake received.
OnHandshake(*Session, Handshake)
Expand Down
20 changes: 11 additions & 9 deletions pkg/networking/mocks/handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/networking/mocks/header.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/networking/mocks/protocol.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 17 additions & 40 deletions pkg/networking/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ func (s *Session) waitForSend(data []byte) error {
if s.logger.Enabled(s.ctx, slog.LevelDebug) {
s.logger.Debug("Sending data", "data", base64.StdEncoding.EncodeToString(data))
}
ready := &sendPacket{data: data, err: errCh}
select {
case s.sendCh <- ready:
case s.sendCh <- newSendPacket(data, errCh):
s.logger.Debug("Data written into send channel")
case <-s.ctx.Done():
s.logger.Debug("Session shutdown while sending data")
Expand All @@ -174,24 +173,6 @@ func (s *Session) waitForSend(data []byte) error {
return ErrConnectionWriteTimeout
}

dataCopy := func() {
if len(data) == 0 {
return // An empty data is ignored.
}

// In the event of session shutdown or connection write timeout, we need to prevent `send` from reading
// the body buffer after returning from this function since the caller may re-use the underlying array.
ready.mu.Lock()
defer ready.mu.Unlock()

if ready.data == nil {
return // data was already copied in `send`.
}
newData := make([]byte, len(data))
copy(newData, data)
ready.data = newData
}

select {
case err, ok := <-errCh:
if !ok {
Expand All @@ -201,11 +182,9 @@ func (s *Session) waitForSend(data []byte) error {
s.logger.Debug("Error sending data", "error", err)
return err
case <-s.ctx.Done():
dataCopy()
s.logger.Debug("Session shutdown while waiting send error")
return ErrSessionShutdown
case <-timer.C:
dataCopy()
s.logger.Debug("Connection write timeout while waiting send error")
return ErrConnectionWriteTimeout
}
Expand All @@ -224,22 +203,16 @@ func (s *Session) sendLoop() error {

case packet := <-s.sendCh:
packet.mu.Lock()
_, rErr := dataBuf.ReadFrom(packet.r)
if rErr != nil {
packet.mu.Unlock()
s.logger.Error("Failed to copy data into buffer", "error", rErr)
s.asyncSendErr(packet.err, rErr)
return rErr
}
if s.logger.Enabled(s.ctx, slog.LevelDebug) {
s.logger.Debug("Sending data to connection",
"data", base64.StdEncoding.EncodeToString(packet.data))
}
if len(packet.data) != 0 {
// Copy the data into the buffer to avoid holding a mutex lock during the writing.
_, err := dataBuf.Write(packet.data)
if err != nil {
packet.data = nil
packet.mu.Unlock()
s.logger.Error("Failed to copy data into buffer", "error", err)
s.asyncSendErr(packet.err, err)
return err // TODO: Do we need to return here?
}
s.logger.Debug("Data copied into buffer")
packet.data = nil
"data", base64.StdEncoding.EncodeToString(dataBuf.Bytes()))
}
packet.mu.Unlock()

Expand Down Expand Up @@ -375,7 +348,7 @@ func (s *Session) readMessagePayload(hdr Header, conn io.Reader) error {
s.logger.Debug("Invoking OnReceive handler", "message",
base64.StdEncoding.EncodeToString(s.receiveBuffer.Bytes()))
}
s.config.handler.OnReceive(s, s.receiveBuffer.Bytes()) // Invoke OnReceive handler.
s.config.handler.OnReceive(s, bytes.NewReader(s.receiveBuffer.Bytes())) // Invoke OnReceive handler.
return nil
}

Expand Down Expand Up @@ -405,9 +378,13 @@ func (s *Session) keepaliveLoop() error {

// sendPacket is used to send data.
type sendPacket struct {
mu sync.Mutex // Protects data from unsafe reads.
data []byte
err chan<- error
mu sync.Mutex // Protects data from unsafe reads.
r io.Reader
err chan<- error
}

func newSendPacket(data []byte, ch chan<- error) *sendPacket {
return &sendPacket{r: bytes.NewReader(data), err: ch}
}

// asyncSendErr is used to try an async send of an error.
Expand Down
6 changes: 4 additions & 2 deletions pkg/networking/session_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package networking_test

import (
"bytes"
"context"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -55,7 +56,8 @@ func TestSuccessfulSession(t *testing.T) {
require.NoError(t, wErr)
assert.Equal(t, 5, n)
})
sc2 := serverHandler.On("OnReceive", ss, encodeMessage("Hello session")).Once().Return()
sc2 := serverHandler.On("OnReceive", ss, bytes.NewReader(encodeMessage("Hello session"))).
Once().Return()
sc2.NotBefore(sc1).
Run(func(_ mock.Arguments) {
n, wErr := ss.Write(encodeMessage("Hi"))
Expand All @@ -73,7 +75,7 @@ func TestSuccessfulSession(t *testing.T) {
require.NoError(t, wErr)
assert.Equal(t, 17, n)
})
cl2 := clientHandler.On("OnReceive", cs, encodeMessage("Hi")).Once().Return()
cl2 := clientHandler.On("OnReceive", cs, bytes.NewReader(encodeMessage("Hi"))).Once().Return()
cl2.NotBefore(cl1).
Run(func(_ mock.Arguments) {
cWG.Done()
Expand Down

0 comments on commit 63ade4e

Please sign in to comment.