diff --git a/README.md b/README.md index 42ff850..03d66fe 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,10 @@ Command synopsis: -packetlen="1400": Max packet length. Must be lower than MTU plus IPv4 and UDP headers to avoid fragmentation. -sendproto="UDP": IP Protocol for sending data: TCP, UDP, or TEST -tcptimeout="1s": Timeout for TCP client remote connections + -backoff-retries="3": Maximum number of retries in backoff for TCP dial when sendproto set to TCP + -backoff-min="50ms": Backoff minimal (integer) time in Millisecond + -backoff-max="1s": Backoff maximal (integer) time in Millisecond + -backoff-factor="1.5": Backoff factor (float) -pprof=false: Golang profiling support -pprof-bind=":8080": Listen host:port for HTTP pprof data -verbose=false: Verbose output diff --git a/glide.lock b/glide.lock new file mode 100644 index 0000000..ef545e1 --- /dev/null +++ b/glide.lock @@ -0,0 +1,6 @@ +hash: 8185028e361b69d497f549b9a21aa8e39b20a1a5ac3638f20f0bfd1bed182828 +updated: 2017-04-26T10:05:23.227245207+02:00 +imports: +- name: github.com/jpillora/backoff + version: 06c7a16c845dc8e0bf575fafeeca0f5462f5eb4d +testImports: [] diff --git a/glide.yaml b/glide.yaml new file mode 100644 index 0000000..69bb592 --- /dev/null +++ b/glide.yaml @@ -0,0 +1,3 @@ +package: github.com/jjneely/statsrelay +imports: +- package: github.com/jpillora/backoff diff --git a/src b/src new file mode 120000 index 0000000..a725465 --- /dev/null +++ b/src @@ -0,0 +1 @@ +vendor/ \ No newline at end of file diff --git a/statsrelay.go b/statsrelay.go index 291862c..93bf3bf 100644 --- a/statsrelay.go +++ b/statsrelay.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "github.com/jpillora/backoff" "io/ioutil" "log" "net" @@ -89,6 +90,18 @@ var profilingBind string // maxprocs int value to set GOMAXPROCS var maxprocs int +// TCPMaxRetries int value for number of retries in dial tcp +var TCPMaxRetries int + +// TCPMinBackoff duration value for minimal backoff limit time +var TCPMinBackoff time.Duration + +// TCPMaxBackoff duration value for maximum backoff limit time +var TCPMaxBackoff time.Duration + +// TCPFactorBackoff float64 value for backoff factor +var TCPFactorBackoff float64 + // sockBufferMaxSize() returns the maximum size that the UDP receive buffer // in the kernel can be set to. In bytes. func getSockBufferMaxSize() (int, error) { @@ -161,7 +174,7 @@ func genTags(metric, metricTags string) string { // sendPacket takes a []byte and writes that directly to a UDP socket // that was assigned for target. -func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration) { +func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Duration, boff *backoff.Backoff) { switch sendproto { case "UDP": conn, err := net.ListenUDP("udp", nil) @@ -171,17 +184,19 @@ func sendPacket(buff []byte, target string, sendproto string, TCPtimeout time.Du conn.WriteToUDP(buff, udpAddr[target]) conn.Close() case "TCP": - tcpAddr, err := net.ResolveTCPAddr("tcp", target) - if err != nil { - log.Fatalf("ResolveTCPAddr Failed %s\n", err.Error()) - } - conn, err := net.DialTimeout("tcp", target, TCPtimeout) - if err != nil { - log.Printf("TCP error for %s - %s\n", tcpAddr, err) - break + for i := 0; i < TCPMaxRetries; i++ { + conn, err := net.DialTimeout("tcp", target, TCPtimeout) + if err != nil { + doff := boff.Duration() + log.Printf("TCP error for %s - %s [Reconnecting in %s, retries left %d/%d]\n", + target, err, doff, TCPMaxRetries-i, TCPMaxRetries) + time.Sleep(doff) + continue + } + boff.Reset() + conn.Write(buff) + defer conn.Close() } - conn.Write(buff) - defer conn.Close() case "TEST": if verbose { log.Printf("Debug: Would have sent packet of %d bytes to %s", @@ -216,6 +231,13 @@ func handleBuff(buff []byte) { numMetrics := 0 statsMetric := prefix + ".statsProcessed" + boff := &backoff.Backoff{ + Min: TCPMinBackoff, + Max: TCPMaxBackoff, + Factor: TCPFactorBackoff, + Jitter: false, + } + for offset := 0; offset < len(buff); { loop: for offset < len(buff) { @@ -233,7 +255,6 @@ func handleBuff(buff []byte) { } size := bytes.IndexByte(buff[offset:], '\n') - if size == -1 { // last metric in buffer size = len(buff) - offset @@ -252,7 +273,7 @@ func handleBuff(buff []byte) { // check built packet size and send if metric doesn't fit if packets[target].Len()+size > packetLen { - go sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout) + go sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout, boff) packets[target].Reset() } // add to packet @@ -262,8 +283,14 @@ func handleBuff(buff []byte) { log.Printf("Sending %s to %s", buffPrefix, target) } if err != nil { - log.Printf("Error %s when adding prefix %s", err, metricsPrefix) - break + if len(metricsPrefix) != 0 { + log.Printf("Error %s when adding prefix %s", err, metricsPrefix) + break + } + if len(metricTags) != 0 { + log.Printf("Error %s when adding tag %s", err, metricTags) + break + } } packets[target].Write(buffPrefix) } else { @@ -294,7 +321,7 @@ func handleBuff(buff []byte) { stats := fmt.Sprintf("%s:%d|c\n", statsMetric, numMetrics) target := hashRing.GetNode(statsMetric).Server if packets[target].Len()+len(stats) > packetLen { - sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout) + sendPacket(packets[target].Bytes(), target, sendproto, TCPtimeout, boff) packets[target].Reset() } packets[target].Write([]byte(stats)) @@ -302,7 +329,7 @@ func handleBuff(buff []byte) { // Empty out any remaining data for _, target := range hashRing.Nodes() { if packets[target.Server].Len() > 0 { - sendPacket(packets[target.Server].Bytes(), target.Server, sendproto, TCPtimeout) + sendPacket(packets[target.Server].Bytes(), target.Server, sendproto, TCPtimeout, boff) } } @@ -347,7 +374,7 @@ func readUDP(ip string, port int, c chan []byte) { if sendproto == "TCP" { log.Printf("TCP send timeout set to %s", TCPtimeout) - + log.Printf("TCP Backoff set Min: %s Max: %s Factor: %f Retries: %d", TCPMinBackoff, TCPMaxBackoff, TCPFactorBackoff, TCPMaxRetries) } if len(metricsPrefix) != 0 { @@ -448,6 +475,11 @@ func main() { flag.BoolVar(&profiling, "pprof", false, "Enable HTTP endpoint for pprof") flag.StringVar(&profilingBind, "pprof-bind", ":8080", "Bind for pprof HTTP endpoint") + flag.IntVar(&TCPMaxRetries, "backoff-retries", 3, "Maximum number of retries in backoff for TCP dial when sendproto set to TCP") + flag.DurationVar(&TCPMinBackoff, "backoff-min", 50*time.Millisecond, "Backoff minimal (integer) time in Millisecond") + flag.DurationVar(&TCPMaxBackoff, "backoff-max", 1000*time.Millisecond, "Backoff maximal (integer) time in Millisecond") + flag.Float64Var(&TCPFactorBackoff, "backoff-factor", 1.5, "Backoff factor (float)") + defaultBufferSize, err := getSockBufferMaxSize() if err != nil { defaultBufferSize = 32 * 1024 diff --git a/vendor/github.com/jpillora/backoff/LICENSE b/vendor/github.com/jpillora/backoff/LICENSE new file mode 100644 index 0000000..1cc7080 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2017 Jaime Pillora + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/github.com/jpillora/backoff/README.md b/vendor/github.com/jpillora/backoff/README.md new file mode 100644 index 0000000..81e77cd --- /dev/null +++ b/vendor/github.com/jpillora/backoff/README.md @@ -0,0 +1,119 @@ +# Backoff + +A simple exponential backoff counter in Go (Golang) + +[![GoDoc](https://godoc.org/github.com/jpillora/backoff?status.svg)](https://godoc.org/github.com/jpillora/backoff) [![Circle CI](https://circleci.com/gh/jpillora/backoff.svg?style=shield)](https://circleci.com/gh/jpillora/backoff) + +### Install + +``` +$ go get -v github.com/jpillora/backoff +``` + +### Usage + +Backoff is a `time.Duration` counter. It starts at `Min`. After every call to `Duration()` it is multiplied by `Factor`. It is capped at `Max`. It returns to `Min` on every call to `Reset()`. `Jitter` adds randomness ([see below](#example-using-jitter)). Used in conjunction with the `time` package. + +--- + +#### Simple example + +``` go + +b := &backoff.Backoff{ + //These are the defaults + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + Jitter: false, +} + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) + +fmt.Printf("Reset!\n") +b.Reset() + +fmt.Printf("%s\n", b.Duration()) +``` + +``` +100ms +200ms +400ms +Reset! +100ms +``` + +--- + +#### Example using `net` package + +``` go +b := &backoff.Backoff{ + Max: 5 * time.Minute, +} + +for { + conn, err := net.Dial("tcp", "example.com:5309") + if err != nil { + d := b.Duration() + fmt.Printf("%s, reconnecting in %s", err, d) + time.Sleep(d) + continue + } + //connected + b.Reset() + conn.Write([]byte("hello world!")) + // ... Read ... Write ... etc + conn.Close() + //disconnected +} + +``` + +--- + +#### Example using `Jitter` + +Enabling `Jitter` adds some randomization to the backoff durations. [See Amazon's writeup of performance gains using jitter](http://www.awsarchitectureblog.com/2015/03/backoff.html). Seeding is not necessary but doing so gives repeatable results. + +```go +import "math/rand" + +b := &backoff.Backoff{ + Jitter: true, +} + +rand.Seed(42) + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) + +fmt.Printf("Reset!\n") +b.Reset() + +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +fmt.Printf("%s\n", b.Duration()) +``` + +``` +100ms +106.600049ms +281.228155ms +Reset! +100ms +104.381845ms +214.957989ms +``` + +#### Documentation + +https://godoc.org/github.com/jpillora/backoff + +#### Credits + +Forked from some JavaScript written by [@tj](https://github.com/tj) \ No newline at end of file diff --git a/vendor/github.com/jpillora/backoff/backoff.go b/vendor/github.com/jpillora/backoff/backoff.go new file mode 100644 index 0000000..a50d0e9 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/backoff.go @@ -0,0 +1,88 @@ +// Package backoff provides an exponential-backoff implementation. +package backoff + +import ( + "math" + "math/rand" + "time" +) + +// Backoff is a time.Duration counter, starting at Min. After every call to +// the Duration method the current timing is multiplied by Factor, but it +// never exceeds Max. +// +// Backoff is not generally concurrent-safe, but the ForAttempt method can +// be used concurrently. +type Backoff struct { + //Factor is the multiplying factor for each increment step + attempt, Factor float64 + //Jitter eases contention by randomizing backoff steps + Jitter bool + //Min and Max are the minimum and maximum values of the counter + Min, Max time.Duration +} + +// Duration returns the duration for the current attempt before incrementing +// the attempt counter. See ForAttempt. +func (b *Backoff) Duration() time.Duration { + d := b.ForAttempt(b.attempt) + b.attempt++ + return d +} + +const maxInt64 = float64(math.MaxInt64 - 512) + +// ForAttempt returns the duration for a specific attempt. This is useful if +// you have a large number of independent Backoffs, but don't want use +// unnecessary memory storing the Backoff parameters per Backoff. The first +// attempt should be 0. +// +// ForAttempt is concurrent-safe. +func (b *Backoff) ForAttempt(attempt float64) time.Duration { + // Zero-values are nonsensical, so we use + // them to apply defaults + min := b.Min + if min <= 0 { + min = 100 * time.Millisecond + } + max := b.Max + if max <= 0 { + max = 10 * time.Second + } + if min >= max { + // short-circuit + return max + } + factor := b.Factor + if factor <= 0 { + factor = 2 + } + //calculate this duration + minf := float64(min) + durf := minf * math.Pow(factor, attempt) + if b.Jitter { + durf = rand.Float64()*(durf-minf) + minf + } + //ensure float64 wont overflow int64 + if durf > maxInt64 { + return max + } + dur := time.Duration(durf) + //keep within bounds + if dur < min { + return min + } else if dur > max { + return max + } + return dur +} + +// Reset restarts the current attempt counter at zero. +func (b *Backoff) Reset() { + b.attempt = 0 +} + +// Attempt returns the current attempt counter value. +func (b *Backoff) Attempt() float64 { + return b.attempt +} diff --git a/vendor/github.com/jpillora/backoff/backoff_test.go b/vendor/github.com/jpillora/backoff/backoff_test.go new file mode 100644 index 0000000..cb36833 --- /dev/null +++ b/vendor/github.com/jpillora/backoff/backoff_test.go @@ -0,0 +1,126 @@ +package backoff + +import ( + "reflect" + "testing" + "time" +) + +func Test1(t *testing.T) { + + b := &Backoff{ + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + } + + equals(t, b.Duration(), 100*time.Millisecond) + equals(t, b.Duration(), 200*time.Millisecond) + equals(t, b.Duration(), 400*time.Millisecond) + b.Reset() + equals(t, b.Duration(), 100*time.Millisecond) +} + +func TestForAttempt(t *testing.T) { + + b := &Backoff{ + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + } + + equals(t, b.ForAttempt(0), 100*time.Millisecond) + equals(t, b.ForAttempt(1), 200*time.Millisecond) + equals(t, b.ForAttempt(2), 400*time.Millisecond) + b.Reset() + equals(t, b.ForAttempt(0), 100*time.Millisecond) +} + +func Test2(t *testing.T) { + + b := &Backoff{ + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 1.5, + } + + equals(t, b.Duration(), 100*time.Millisecond) + equals(t, b.Duration(), 150*time.Millisecond) + equals(t, b.Duration(), 225*time.Millisecond) + b.Reset() + equals(t, b.Duration(), 100*time.Millisecond) +} + +func Test3(t *testing.T) { + + b := &Backoff{ + Min: 100 * time.Nanosecond, + Max: 10 * time.Second, + Factor: 1.75, + } + + equals(t, b.Duration(), 100*time.Nanosecond) + equals(t, b.Duration(), 175*time.Nanosecond) + equals(t, b.Duration(), 306*time.Nanosecond) + b.Reset() + equals(t, b.Duration(), 100*time.Nanosecond) +} + +func Test4(t *testing.T) { + b := &Backoff{ + Min: 500 * time.Second, + Max: 100 * time.Second, + Factor: 1, + } + + equals(t, b.Duration(), b.Max) +} + +func TestGetAttempt(t *testing.T) { + b := &Backoff{ + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + } + equals(t, b.Attempt(), float64(0)) + equals(t, b.Duration(), 100*time.Millisecond) + equals(t, b.Attempt(), float64(1)) + equals(t, b.Duration(), 200*time.Millisecond) + equals(t, b.Attempt(), float64(2)) + equals(t, b.Duration(), 400*time.Millisecond) + equals(t, b.Attempt(), float64(3)) + b.Reset() + equals(t, b.Attempt(), float64(0)) + equals(t, b.Duration(), 100*time.Millisecond) + equals(t, b.Attempt(), float64(1)) +} + +func TestJitter(t *testing.T) { + b := &Backoff{ + Min: 100 * time.Millisecond, + Max: 10 * time.Second, + Factor: 2, + Jitter: true, + } + + equals(t, b.Duration(), 100*time.Millisecond) + between(t, b.Duration(), 100*time.Millisecond, 200*time.Millisecond) + between(t, b.Duration(), 100*time.Millisecond, 400*time.Millisecond) + b.Reset() + equals(t, b.Duration(), 100*time.Millisecond) +} + +func between(t *testing.T, actual, low, high time.Duration) { + if actual < low { + t.Fatalf("Got %s, Expecting >= %s", actual, low) + } + if actual > high { + t.Fatalf("Got %s, Expecting <= %s", actual, high) + } +} + +func equals(t *testing.T, v1, v2 interface{}) { + if !reflect.DeepEqual(v1, v2) { + t.Fatalf("Got %v, Expecting %v", v1, v2) + } +}