Skip to content

Commit

Permalink
Fix: tcp connection broken detection (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu authored Jul 31, 2020
1 parent 5a5f416 commit 297995d
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 20 deletions.
10 changes: 10 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ func (c *Connection) SetReadDeadline(t time.Time) error {
return c.conn.SetReadDeadline(t)
}

// SetReadTimeout is equivalent to ReadDeadline(now + timeout)
func (c *Connection) SetReadTimeout(t time.Duration) error {
return c.conn.SetReadDeadline(time.Now().Add(t))
}

// SetWriteDeadline sets the deadline for future Write calls
// and any currently-blocked Write call.
// Even if write times out, it may return n > 0, indicating that
Expand All @@ -95,3 +100,8 @@ func (c *Connection) SetReadDeadline(t time.Time) error {
func (c *Connection) SetWriteDeadline(t time.Time) error {
return c.conn.SetWriteDeadline(t)
}

// SetWriteTimeout is equivalent to WriteDeadline(now + timeout)
func (c *Connection) SetWriteTimeout(t time.Duration) error {
return c.conn.SetWriteDeadline(time.Now().Add(t))
}
20 changes: 13 additions & 7 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,31 @@ func sendingAndReceiveSMS(wg *sync.WaitGroup) {
defer wg.Done()

auth := gosmpp.Auth{
SMSC: "127.0.0.1:2775",
SystemID: "522241",
Password: "UUDHWB",
SMSC: "smscsim.melroselabs.com:2775",
SystemID: "169994",
Password: "EDXPJU",
SystemType: "",
}

trans, err := gosmpp.NewTransceiverSession(gosmpp.NonTLSDialer, auth, gosmpp.TransceiveSettings{
EnquireLink: 5 * time.Second,

WriteTimeout: time.Second,

// this setting is very important to detect broken conn.
// After timeout, there is no read packet, then we decide it's connection broken.
ReadTimeout: 10 * time.Second,

OnSubmitError: func(p pdu.PDU, err error) {
log.Fatal(err)
log.Fatal("SubmitPDU error:", err)
},

OnReceivingError: func(err error) {
fmt.Println(err)
fmt.Println("Receiving PDU/Network error:", err)
},

OnRebindingError: func(err error) {
fmt.Println(err)
fmt.Println("Rebinding but error:", err)
},

OnPDU: handlePDU(),
Expand All @@ -59,7 +65,7 @@ func sendingAndReceiveSMS(wg *sync.WaitGroup) {
}()

// sending SMS(s)
for i := 0; i < 60; i++ {
for i := 0; i < 1800; i++ {
if err = trans.Transceiver().Submit(newSubmitSM()); err != nil {
fmt.Println(err)
}
Expand Down
25 changes: 24 additions & 1 deletion receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,17 @@ import (
"github.com/linxGnu/gosmpp/pdu"
)

const (
defaultReadTimeout = 2 * time.Second
)

// ReceiveSettings is event listener for Receiver.
type ReceiveSettings struct {
// Timeout represents conn read timeout.
// This field is very important to detect connection failure.
// Default: 2 secs
Timeout time.Duration

// OnPDU handles received PDU from SMSC.
//
// `Responded` flag indicates this pdu is responded automatically,
Expand All @@ -30,6 +39,12 @@ type ReceiveSettings struct {
response func(pdu.PDU)
}

func (s *ReceiveSettings) normalize() {
if s.Timeout <= 0 {
s.Timeout = defaultReadTimeout
}
}

type receiver struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -45,6 +60,8 @@ func NewReceiver(conn *Connection, settings ReceiveSettings) Receiver {
}

func newReceiver(conn *Connection, settings ReceiveSettings, startDaemon bool) *receiver {
settings.normalize()

r := &receiver{
settings: settings,
conn: conn,
Expand Down Expand Up @@ -130,8 +147,14 @@ func (t *receiver) loop() {
default:
}

p, err := pdu.Parse(t.conn)
// read pdu from conn
var p pdu.PDU
err := t.conn.SetReadTimeout(t.settings.Timeout)
if err == nil {
p, err = pdu.Parse(t.conn)
}

// check error
if closeOnError := t.check(err); closeOnError || t.handleOrClose(p) {
if closeOnError {
t.closing(InvalidStreaming)
Expand Down
25 changes: 24 additions & 1 deletion state.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
StoppingProcessOnly

// InvalidStreaming indicates Transceiver/Receiver data reading state is
// invalid due to network connection/ or SMSC responsed with an invalid PDU
// invalid due to network connection or SMSC responsed with an invalid PDU
// which potentially damages other following PDU(s).
//
// In both cases, Transceiver/Receiver is closed implicitly.
Expand All @@ -25,3 +25,26 @@ const (
// UnbindClosing indicates Receiver got unbind request from SMSC and closed due to this request.
UnbindClosing
)

// String interface.
func (s *State) String() string {
switch *s {
case ExplicitClosing:
return "ExplicitClosing"

case StoppingProcessOnly:
return "StoppingProcessOnly"

case InvalidStreaming:
return "InvalidStreaming"

case ConnectionIssue:
return "ConnectionIssue"

case UnbindClosing:
return "UnbindClosing"

default:
return ""
}
}
21 changes: 17 additions & 4 deletions transceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import (

// TransceiveSettings is listener for Transceiver.
type TransceiveSettings struct {
// WriteTimeout is timeout/deadline for submitting PDU.
// WriteTimeout is timeout for submitting PDU.
WriteTimeout time.Duration

// ReadTimeout is timeout for reading PDU from SMSC.
// Underlying net.Conn will be stricted with ReadDeadline(now + timeout).
// This setting is very important to detect connection failure.
//
// Default: 2 secs
ReadTimeout time.Duration

// EnquireLink periodically sends EnquireLink to SMSC.
// Zero duration means disable auto enquire link.
EnquireLink time.Duration
Expand Down Expand Up @@ -52,9 +59,12 @@ func NewTransceiver(conn *Connection, settings TransceiveSettings) Transceiver {
}

t.out = newTransmitter(conn, TransmitSettings{
WriteTimeout: settings.WriteTimeout,
EnquireLink: settings.EnquireLink,
Timeout: settings.WriteTimeout,

EnquireLink: settings.EnquireLink,

OnSubmitError: settings.OnSubmitError,

OnClosed: func(state State) {
switch state {
case ExplicitClosing:
Expand All @@ -72,7 +82,10 @@ func NewTransceiver(conn *Connection, settings TransceiveSettings) Transceiver {
}, false)

t.in = newReceiver(conn, ReceiveSettings{
OnPDU: settings.OnPDU,
Timeout: settings.ReadTimeout,

OnPDU: settings.OnPDU,

OnReceivingError: settings.OnReceivingError,

OnClosed: func(state State) {
Expand Down
22 changes: 17 additions & 5 deletions transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ var (

// TransmitSettings is listener for transmitter.
type TransmitSettings struct {
// WriteTimeout is timeout/deadline for submitting PDU.
WriteTimeout time.Duration
// Timeout is timeout/deadline for submitting PDU.
Timeout time.Duration

// EnquireLink periodically sends EnquireLink to SMSC.
// The duration must not be smaller than 1 minute.
Expand All @@ -41,6 +41,12 @@ type TransmitSettings struct {
OnClosed func(State)
}

func (s *TransmitSettings) normalize() {
if s.EnquireLink <= EnquireLinkIntervalMinimum {
s.EnquireLink = EnquireLinkIntervalMinimum
}
}

type transmitter struct {
ctx context.Context
cancel context.CancelFunc
Expand All @@ -58,6 +64,8 @@ func NewTransmitter(conn *Connection, settings TransmitSettings) Transmitter {
}

func newTransmitter(conn *Connection, settings TransmitSettings, startDaemon bool) *transmitter {
settings.normalize()

t := &transmitter{
settings: settings,
conn: conn,
Expand Down Expand Up @@ -235,16 +243,20 @@ func (t *transmitter) check(p pdu.PDU, n int, err error) (closing bool) {

// low level writing
func (t *transmitter) write(v []byte) (n int, err error) {
hasTimeout := t.settings.WriteTimeout > 0
hasTimeout := t.settings.Timeout > 0

if hasTimeout {
_ = t.conn.SetWriteDeadline(time.Now().Add(t.settings.WriteTimeout))
if err = t.conn.SetWriteTimeout(t.settings.Timeout); err != nil {
return
}
}

if n, err = t.conn.Write(v); err != nil && n == 0 {
// retry again with double timeout
if hasTimeout {
_ = t.conn.SetWriteDeadline(time.Now().Add(t.settings.WriteTimeout << 1))
if err = t.conn.SetWriteTimeout(t.settings.Timeout << 1); err != nil {
return
}
}

n, err = t.conn.Write(v)
Expand Down
5 changes: 3 additions & 2 deletions transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ func TestTransmitter(t *testing.T) {
t.Run("binding", func(t *testing.T) {
auth := nextAuth()
transmitter, err := NewTransmitterSession(NonTLSDialer, auth, TransmitSettings{
WriteTimeout: time.Second,
Timeout: time.Second,

OnSubmitError: func(p pdu.PDU, err error) {
t.Fatal(err)
},
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestTransmitter(t *testing.T) {
_, ok := p.(*pdu.CancelSM)
require.True(t, ok)
}
tr.settings.WriteTimeout = 500 * time.Millisecond
tr.settings.Timeout = 500 * time.Millisecond

// do trigger
trigger(&tr)
Expand Down

0 comments on commit 297995d

Please sign in to comment.