Skip to content

Commit

Permalink
list dropped handler
Browse files Browse the repository at this point in the history
  • Loading branch information
lomik committed Oct 23, 2018
1 parent 6d0913c commit c5d25c1
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 12 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,16 @@ enabled = false
drop-future = "0s"
drop-past = "0s"

[pprof]
# Golang pprof + some extra locations
#
# Last 1000 points dropped by "drop-future" and "drop-past" rules:
# /debug/receive/tcp/dropped/
# /debug/receive/udp/dropped/
# /debug/receive/pickle/dropped/
# /debug/receive/grpc/dropped/
# /debug/receive/prometheus/dropped/
# /debug/receive/telegraf_http_json/dropped/
[pprof]
listen = "localhost:7007"
enabled = false
```
12 changes: 12 additions & 0 deletions carbon/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/tcp/dropped/", app.TCP.DroppedHandler)
}

if conf.Udp.Enabled {
Expand All @@ -256,6 +258,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/udp/dropped/", app.UDP.DroppedHandler)
}

if conf.Pickle.Enabled {
Expand All @@ -270,6 +274,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/pickle/dropped/", app.Pickle.DroppedHandler)
}

if conf.Grpc.Enabled {
Expand All @@ -283,6 +289,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/grpc/dropped/", app.Grpc.DroppedHandler)
}

if conf.Prometheus.Enabled {
Expand All @@ -296,6 +304,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/prometheus/dropped/", app.Prometheus.DroppedHandler)
}

if conf.TelegrafHttpJson.Enabled {
Expand All @@ -309,6 +319,8 @@ func (app *App) Start() (err error) {
if err != nil {
return
}

http.HandleFunc("/debug/receive/telegraf_http_json/dropped/", app.TelegrafHttpJson.DroppedHandler)
}
/* RECEIVER end */

Expand Down
49 changes: 49 additions & 0 deletions receiver/base.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package receiver

import (
"fmt"
"net/http"
"sort"
"sync"
"sync/atomic"

"github.com/lomik/carbon-clickhouse/helper/RowBinary"
"github.com/lomik/carbon-clickhouse/helper/stop"
"go.uber.org/zap"
)

const droppedListSize = 1000

type Base struct {
stop.Struct
stat struct {
Expand All @@ -20,6 +26,9 @@ type Base struct {
futureDropped uint64 // atomic
pastDropped uint64 // atomic
}
droppedList [droppedListSize]string
droppedListNext int
droppedListMu sync.Mutex
parseThreads int
dropFutureSeconds uint32
dropPastSeconds uint32
Expand Down Expand Up @@ -53,6 +62,46 @@ func (base *Base) isDrop(nowTime uint32, metricTime uint32) bool {
return false
}

func (base *Base) DroppedHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")

n := make([]string, droppedListSize)
base.droppedListMu.Lock()
copy(n, base.droppedList[:])
base.droppedListMu.Unlock()
sort.Strings(n)
for i := 0; i < len(n); i++ {
if n[i] != "" {
fmt.Fprintln(w, n[i])
}
}
}

func (base *Base) saveDropped(name string, nowTime uint32, metricTime uint32, value float64) {
s := fmt.Sprintf("rcv:%d\tname:%s\ttimestamp:%d\tvalue:%#v", nowTime, name, metricTime, value)

base.droppedListMu.Lock()
base.droppedList[base.droppedListNext%droppedListSize] = s
base.droppedListNext++
base.droppedListMu.Unlock()
}

func (base *Base) isDropString(name string, nowTime uint32, metricTime uint32, value float64) bool {
if !base.isDrop(nowTime, metricTime) {
return false
}
base.saveDropped(name, nowTime, metricTime, value)
return true
}

func (base *Base) isDropBytes(name []byte, nowTime uint32, metricTime uint32, value float64) bool {
if !base.isDrop(nowTime, metricTime) {
return false
}
base.saveDropped(string(name), nowTime, metricTime, value)
return true
}

func (base *Base) SendStat(send func(metric string, value float64), fields ...string) {
for _, f := range fields {
switch f {
Expand Down
2 changes: 1 addition & 1 deletion receiver/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (g *GRPC) doStore(requestCtx context.Context, in *pb.Payload, confirmRequir
m := in.Metrics[i]

for j := 0; j < len(m.Points); j++ {
if g.isDrop(now, m.Points[j].Timestamp) {
if g.isDropString(m.Metric, now, m.Points[j].Timestamp, m.Points[j].Value) {
continue
}

Expand Down
8 changes: 4 additions & 4 deletions receiver/pickle_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ func (base *Base) PickleParseBytes(ctx context.Context, b []byte, now uint32) {
}

pickle.ParseMessage(b, func(name string, value float64, timestamp int64) {
if base.isDrop(now, uint32(timestamp)) {
return
}

name, err := tags.Graphite(name)
if err != nil {
// @TODO: log?
return
}

if base.isDropString(name, now, uint32(timestamp), value) {
return
}

if !wb.CanWriteGraphitePoint(len(name)) {
flush()
if len(name) > RowBinary.WriteBufferSize-50 {
Expand Down
2 changes: 1 addition & 1 deletion receiver/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ MainLoop:
continue MainLoop
}

if base.isDrop(b.Time, timestamp) {
if base.isDropBytes(name, b.Time, timestamp, value) {
continue MainLoop
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (rcv *PrometheusRemoteWrite) ServeHTTP(w http.ResponseWriter, r *http.Reque
if math.IsNaN(samples[j].Value) {
continue
}
if rcv.isDrop(writer.Now(), uint32(samples[j].Timestamp/1000)) {
if rcv.isDropString(metric, writer.Now(), uint32(samples[j].Timestamp/1000), samples[j].Value) {
continue
}

Expand Down
2 changes: 2 additions & 0 deletions receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package receiver
import (
"fmt"
"net"
"net/http"
"net/url"
"strings"

Expand All @@ -12,6 +13,7 @@ import (

type Receiver interface {
Stat(func(metric string, value float64))
DroppedHandler(w http.ResponseWriter, r *http.Request)
Stop()
}

Expand Down
10 changes: 6 additions & 4 deletions receiver/telegraf_http_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,9 @@ func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) {

var pathBuf bytes.Buffer

metricsLoop:
for i := 0; i < len(data.Metrics); i++ {
m := data.Metrics[i]
if rcv.isDrop(writer.Now(), uint32(m.Timestamp)) {
continue
}

tags := TelegrafEncodeTags(m.Tags)

Expand Down Expand Up @@ -127,7 +125,11 @@ func (rcv *TelegrafHttpJson) ServeHTTP(w http.ResponseWriter, r *http.Request) {
pathBuf.WriteByte('?')
pathBuf.WriteString(tags)

writer.WritePoint(pathBuf.String(), v, m.Timestamp)
name := pathBuf.String()
if rcv.isDropString(name, writer.Now(), uint32(m.Timestamp), v) {
continue metricsLoop
}
writer.WritePoint(name, v, m.Timestamp)
}
}

Expand Down

0 comments on commit c5d25c1

Please sign in to comment.