Skip to content

Commit

Permalink
Merge pull request #21 from szibis/backoff
Browse files Browse the repository at this point in the history
Add exponential backoff with retries for TCP dial
  • Loading branch information
jjneely authored Apr 26, 2017
2 parents 797199a + 658e09b commit 9e07749
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 18 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ Command synopsis:
-packetlen="1400": Max packet length. Must be lower than MTU plus IPv4 and UDP headers to avoid fragmentation.
-sendproto="UDP": IP Protocol for sending data: TCP, UDP, or TEST
-tcptimeout="1s": Timeout for TCP client remote connections
-backoff-retries="3": Maximum number of retries in backoff for TCP dial when sendproto set to TCP
-backoff-min="50ms": Backoff minimal (integer) time in Millisecond
-backoff-max="1s": Backoff maximal (integer) time in Millisecond
-backoff-factor="1.5": Backoff factor (float)
-pprof=false: Golang profiling support
-pprof-bind=":8080": Listen host:port for HTTP pprof data
-verbose=false: Verbose output
Expand Down
6 changes: 6 additions & 0 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package: github.com/jjneely/statsrelay
imports:
- package: github.com/jpillora/backoff
1 change: 1 addition & 0 deletions src
68 changes: 50 additions & 18 deletions statsrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"github.com/jpillora/backoff"
"io/ioutil"
"log"
"net"
Expand Down Expand Up @@ -89,6 +90,18 @@ var profilingBind string
// maxprocs int value to set GOMAXPROCS
var maxprocs int

// TCPMaxRetries int value for number of retries in dial tcp
var TCPMaxRetries int

// TCPMinBackoff duration value for minimal backoff limit time
var TCPMinBackoff time.Duration

// TCPMaxBackoff duration value for maximum backoff limit time
var TCPMaxBackoff time.Duration

// TCPFactorBackoff float64 value for backoff factor
var TCPFactorBackoff float64

// sockBufferMaxSize() returns the maximum size that the UDP receive buffer
// in the kernel can be set to. In bytes.
func getSockBufferMaxSize() (int, error) {
Expand Down Expand Up @@ -161,7 +174,7 @@ func genTags(metric, metricTags string) string {

// sendPacket takes a []byte and writes that directly to a UDP socket
// that was assigned for target.
func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration) {
func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration, boff *backoff.Backoff) {
switch sendproto {
case "UDP":
conn, err := net.ListenUDP("udp", nil)
Expand All @@ -171,17 +184,19 @@ func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Du
conn.WriteToUDP(buff, udpAddr[target])
conn.Close()
case "TCP":
tcpAddr, err := net.ResolveTCPAddr("tcp", target)
if err != nil {
log.Fatalf("ResolveTCPAddr Failed %s\n", err.Error())
}
conn, err := net.DialTimeout("tcp", target, TCPtimeout)
if err != nil {
log.Printf("TCP error for %s - %s\n", tcpAddr, err)
break
for i := 0; i < TCPMaxRetries; i++ {
conn, err := net.DialTimeout("tcp", target, TCPtimeout)
if err != nil {
doff := boff.Duration()
log.Printf("TCP error for %s - %s [Reconnecting in %s, retries left %d/%d]\n",
target, err, doff, TCPMaxRetries-i, TCPMaxRetries)
time.Sleep(doff)
continue
}
boff.Reset()
conn.Write(buff)
defer conn.Close()
}
conn.Write(buff)
defer conn.Close()
case "TEST":
if verbose {
log.Printf("Debug: Would have sent packet of %d bytes to %s",
Expand Down Expand Up @@ -216,6 +231,13 @@ func handleBuff(buff []byte) {
numMetrics := 0
statsMetric := prefix + ".statsProcessed"

boff := &backoff.Backoff{
Min: TCPMinBackoff,
Max: TCPMaxBackoff,
Factor: TCPFactorBackoff,
Jitter: false,
}

for offset := 0; offset < len(buff); {
loop:
for offset < len(buff) {
Expand All @@ -233,7 +255,6 @@ func handleBuff(buff []byte) {
}

size := bytes.IndexByte(buff[offset:], '\n')

if size == -1 {
// last metric in buffer
size = len(buff) - offset
Expand All @@ -252,7 +273,7 @@ func handleBuff(buff []byte) {

// check built packet size and send if metric doesn't fit
if packets[target].Len()+size > packetLen {
go sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout)
go sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout, boff)
packets[target].Reset()
}
// add to packet
Expand All @@ -262,8 +283,14 @@ func handleBuff(buff []byte) {
log.Printf("Sending %s to %s", buffPrefix, target)
}
if err != nil {
log.Printf("Error %s when adding prefix %s", err, metricsPrefix)
break
if len(metricsPrefix) != 0 {
log.Printf("Error %s when adding prefix %s", err, metricsPrefix)
break
}
if len(metricTags) != 0 {
log.Printf("Error %s when adding tag %s", err, metricTags)
break
}
}
packets[target].Write(buffPrefix)
} else {
Expand Down Expand Up @@ -294,15 +321,15 @@ func handleBuff(buff []byte) {
stats := fmt.Sprintf("%s:%d|c\n", statsMetric, numMetrics)
target := hashRing.GetNode(statsMetric).Server
if packets[target].Len()+len(stats) > packetLen {
sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout)
sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout, boff)
packets[target].Reset()
}
packets[target].Write([]byte(stats))

// Empty out any remaining data
for _, target := range hashRing.Nodes() {
if packets[target.Server].Len() > 0 {
sendPacket(packets[target.Server].Bytes(), target.Server, sendproto, TCPtimeout)
sendPacket(packets[target.Server].Bytes(), target.Server, sendproto, TCPtimeout, boff)
}
}

Expand Down Expand Up @@ -347,7 +374,7 @@ func readUDP(ip string, port int, c chan []byte) {

if sendproto == "TCP" {
log.Printf("TCP send timeout set to %s", TCPtimeout)

log.Printf("TCP Backoff set Min: %s Max: %s Factor: %f Retries: %d", TCPMinBackoff, TCPMaxBackoff, TCPFactorBackoff, TCPMaxRetries)
}

if len(metricsPrefix) != 0 {
Expand Down Expand Up @@ -448,6 +475,11 @@ func main() {
flag.BoolVar(&profiling, "pprof", false, "Enable HTTP endpoint for pprof")
flag.StringVar(&profilingBind, "pprof-bind", ":8080", "Bind for pprof HTTP endpoint")

flag.IntVar(&TCPMaxRetries, "backoff-retries", 3, "Maximum number of retries in backoff for TCP dial when sendproto set to TCP")
flag.DurationVar(&TCPMinBackoff, "backoff-min", 50*time.Millisecond, "Backoff minimal (integer) time in Millisecond")
flag.DurationVar(&TCPMaxBackoff, "backoff-max", 1000*time.Millisecond, "Backoff maximal (integer) time in Millisecond")
flag.Float64Var(&TCPFactorBackoff, "backoff-factor", 1.5, "Backoff factor (float)")

defaultBufferSize, err := getSockBufferMaxSize()
if err != nil {
defaultBufferSize = 32 * 1024
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/jpillora/backoff/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 119 additions & 0 deletions vendor/github.com/jpillora/backoff/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9e07749

Please sign in to comment.