diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cedd413 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.idea +go.mod +go.sum \ No newline at end of file diff --git a/examples/client-example/main.go b/examples/client-example/main.go index 5b38187..86e4aad 100644 --- a/examples/client-example/main.go +++ b/examples/client-example/main.go @@ -4,8 +4,8 @@ import ( "fmt" "log" "os" + "promtail/promtail" "time" - "github.com/afiskon/promtail-client/promtail" ) func displayUsage() { @@ -20,13 +20,13 @@ func displayInvalidName(arg string) { func nameIsValid(name string) bool { for _, c := range name { - if !((c >= 'a' && c <= 'z') || - (c >= 'A' && c <= 'Z') || - (c >= '0' && c <= '9') || - (c == '-') || (c == '_')) { - return false - } - } + if !((c >= 'a' && c <= 'z') || + (c >= 'A' && c <= 'Z') || + (c >= '0' && c <= '9') || + (c == '-') || (c == '_')) { + return false + } + } return true } @@ -50,19 +50,19 @@ func main() { displayInvalidName("job-name") } - labels := "{source=\""+source_name+"\",job=\""+job_name+"\"}" + labels := "{source=\"" + source_name + "\",job=\"" + job_name + "\"}" conf := promtail.ClientConfig{ PushURL: "http://localhost:3100/api/prom/push", Labels: labels, BatchWait: 5 * time.Second, - BatchEntriesNumber: 10000, - SendLevel: promtail.INFO, - PrintLevel: promtail.ERROR, + BatchEntriesNumber: 100, + SendLevel: promtail.INFO, + PrintLevel: promtail.ERROR, } var ( loki promtail.Client - err error + err error ) if format == "proto" { @@ -82,6 +82,7 @@ func main() { loki.Infof("source = %s, time = %s, i = %d\n", source_name, tstamp, i) loki.Warnf("source = %s, time = %s, i = %d\n", source_name, tstamp, i) loki.Errorf("source = %s, time = %s, i = %d\n", source_name, tstamp, i) + loki.LogfWithLabel("{source=\"new_"+source_name+"\"}", "source = %s, time = %s, i = %d\n", source_name, tstamp, i) time.Sleep(1 * time.Second) } diff --git a/promtail/common.go b/promtail/common.go index f44774d..31c7147 100644 --- a/promtail/common.go +++ b/promtail/common.go @@ -38,6 +38,11 @@ type Client interface { Infof(format string, args ...interface{}) Warnf(format string, args ...interface{}) Errorf(format string, args ...interface{}) + DebugfWithLabel(label, format string, args ...interface{}) + InfofWithLabel(label, format string, args ...interface{}) + WarnfWithLabel(label, format string, args ...interface{}) + ErrorfWithLabel(label, format string, args ...interface{}) + LogfWithLabel(label, format string, args ...interface{}) Shutdown() } @@ -67,4 +72,4 @@ func (client *httpClient) sendJsonReq(method, url string, ctype string, reqBody } return resp, resBody, nil -} \ No newline at end of file +} diff --git a/promtail/jsonclient.go b/promtail/jsonclient.go index 6ee022e..5214072 100644 --- a/promtail/jsonclient.go +++ b/promtail/jsonclient.go @@ -8,12 +8,17 @@ import ( "time" ) +type jsonLogEntryLabel struct { + Label string `json:"label"` + Ts time.Time `json:"ts"` + Line string `json:"line"` + level LogLevel // not used in JSON +} type jsonLogEntry struct { Ts time.Time `json:"ts"` Line string `json:"line"` - level LogLevel // not used in JSON + level LogLevel // not used in JSON } - type promtailStream struct { Labels string `json:"labels"` Entries []*jsonLogEntry `json:"entries"` @@ -26,7 +31,7 @@ type promtailMsg struct { type clientJson struct { config *ClientConfig quit chan struct{} - entries chan *jsonLogEntry + entries chan *jsonLogEntryLabel waitGroup sync.WaitGroup client httpClient } @@ -35,7 +40,7 @@ func NewClientJson(conf ClientConfig) (Client, error) { client := clientJson{ config: &conf, quit: make(chan struct{}), - entries: make(chan *jsonLogEntry, LOG_ENTRIES_CHAN_SIZE), + entries: make(chan *jsonLogEntryLabel, LOG_ENTRIES_CHAN_SIZE), client: httpClient{}, } @@ -46,24 +51,47 @@ func NewClientJson(conf ClientConfig) (Client, error) { } func (c *clientJson) Debugf(format string, args ...interface{}) { - c.log(format, DEBUG, "Debug: ", args...) + c.log("", format, DEBUG, "Debug: ", args...) } func (c *clientJson) Infof(format string, args ...interface{}) { - c.log(format, INFO, "Info: ", args...) + c.log("", format, INFO, "Info: ", args...) } func (c *clientJson) Warnf(format string, args ...interface{}) { - c.log(format, WARN, "Warn: ", args...) + c.log("", format, WARN, "Warn: ", args...) } +func (c *clientJson) Logf(format string, args ...interface{}) { + c.log("", format, INFO, "", args...) +} func (c *clientJson) Errorf(format string, args ...interface{}) { - c.log(format, ERROR, "Error: ", args...) + c.log("", format, ERROR, "Error: ", args...) } -func (c *clientJson) log(format string, level LogLevel, prefix string, args ...interface{}) { +func (c *clientJson) DebugfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, DEBUG, "Debug: ", args...) +} + +func (c *clientJson) InfofWithLabel(label, format string, args ...interface{}) { + c.log(label, format, INFO, "Info: ", args...) +} + +func (c *clientJson) WarnfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, WARN, "Warn: ", args...) +} + +func (c *clientJson) ErrorfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, ERROR, "Error: ", args...) +} +func (c *clientJson) LogfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, INFO, "", args...) +} + +func (c *clientJson) log(label, format string, level LogLevel, prefix string, args ...interface{}) { if (level >= c.config.SendLevel) || (level >= c.config.PrintLevel) { - c.entries <- &jsonLogEntry{ + c.entries <- &jsonLogEntryLabel{ + Label: label, Ts: time.Now(), Line: fmt.Sprintf(prefix+format, args...), level: level, @@ -77,7 +105,8 @@ func (c *clientJson) Shutdown() { } func (c *clientJson) run() { - var batch []*jsonLogEntry + var batch map[string][]*jsonLogEntry + batch = make(map[string][]*jsonLogEntry) batchSize := 0 maxWait := time.NewTimer(c.config.BatchWait) @@ -99,11 +128,20 @@ func (c *clientJson) run() { } if entry.level >= c.config.SendLevel { - batch = append(batch, entry) + if entry.Label == "" { + entry.Label = c.config.Labels + } + batch[entry.Label] = append(batch[entry.Label], &jsonLogEntry{ + Ts: entry.Ts, + level: entry.level, + Line: entry.Line, + }) + batchSize++ if batchSize >= c.config.BatchEntriesNumber { c.send(batch) - batch = []*jsonLogEntry{} + batch = make(map[string][]*jsonLogEntry) + batchSize = 0 maxWait.Reset(c.config.BatchWait) } @@ -111,7 +149,8 @@ func (c *clientJson) run() { case <-maxWait.C: if batchSize > 0 { c.send(batch) - batch = []*jsonLogEntry{} + batch = make(map[string][]*jsonLogEntry) + batchSize = 0 } maxWait.Reset(c.config.BatchWait) @@ -119,13 +158,15 @@ func (c *clientJson) run() { } } -func (c *clientJson) send(entries []*jsonLogEntry) { +func (c *clientJson) send(entries map[string][]*jsonLogEntry) { var streams []promtailStream - streams = append(streams, promtailStream{ - Labels: c.config.Labels, - Entries: entries, - }) + for k, v := range entries { + streams = append(streams, promtailStream{ + Labels: k, + Entries: v, + }) + } msg := promtailMsg{Streams: streams} jsonMsg, err := json.Marshal(msg) if err != nil { diff --git a/promtail/protoclient.go b/promtail/protoclient.go index 6764e0f..6d8e5f7 100644 --- a/promtail/protoclient.go +++ b/promtail/protoclient.go @@ -2,15 +2,20 @@ package promtail import ( "fmt" + "github.com/afiskon/promtail-client/logproto" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/snappy" - "github.com/afiskon/promtail-client/logproto" "log" "sync" "time" ) +type protoLogEntryLabel struct { + entry *logproto.Entry + level LogLevel + label string +} type protoLogEntry struct { entry *logproto.Entry level LogLevel @@ -19,7 +24,7 @@ type protoLogEntry struct { type clientProto struct { config *ClientConfig quit chan struct{} - entries chan protoLogEntry + entries chan protoLogEntryLabel waitGroup sync.WaitGroup client httpClient } @@ -28,7 +33,7 @@ func NewClientProto(conf ClientConfig) (Client, error) { client := clientProto{ config: &conf, quit: make(chan struct{}), - entries: make(chan protoLogEntry, LOG_ENTRIES_CHAN_SIZE), + entries: make(chan protoLogEntryLabel, LOG_ENTRIES_CHAN_SIZE), client: httpClient{}, } @@ -39,25 +44,46 @@ func NewClientProto(conf ClientConfig) (Client, error) { } func (c *clientProto) Debugf(format string, args ...interface{}) { - c.log(format, DEBUG, "Debug: ", args...) + c.log("", format, DEBUG, "Debug: ", args...) } func (c *clientProto) Infof(format string, args ...interface{}) { - c.log(format, INFO, "Info: ", args...) + c.log("", format, INFO, "Info: ", args...) } func (c *clientProto) Warnf(format string, args ...interface{}) { - c.log(format, WARN, "Warn: ", args...) + c.log("", format, WARN, "Warn: ", args...) } func (c *clientProto) Errorf(format string, args ...interface{}) { - c.log(format, ERROR, "Error: ", args...) + c.log("", format, ERROR, "Error: ", args...) +} +func (c *clientProto) Logf(format string, args ...interface{}) { + c.log("", format, INFO, "", args...) +} + +func (c *clientProto) DebugfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, DEBUG, "Debug: ", args...) } -func (c *clientProto) log(format string, level LogLevel, prefix string, args ...interface{}) { +func (c *clientProto) InfofWithLabel(label, format string, args ...interface{}) { + c.log(label, format, INFO, "Info: ", args...) +} + +func (c *clientProto) WarnfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, WARN, "Warn: ", args...) +} + +func (c *clientProto) ErrorfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, ERROR, "Error: ", args...) +} +func (c *clientProto) LogfWithLabel(label, format string, args ...interface{}) { + c.log(label, format, INFO, "", args...) +} +func (c *clientProto) log(label, format string, level LogLevel, prefix string, args ...interface{}) { if (level >= c.config.SendLevel) || (level >= c.config.PrintLevel) { now := time.Now().UnixNano() - c.entries <- protoLogEntry{ + c.entries <- protoLogEntryLabel{ entry: &logproto.Entry{ Timestamp: ×tamp.Timestamp{ Seconds: now / int64(time.Second), @@ -65,6 +91,7 @@ func (c *clientProto) log(format string, level LogLevel, prefix string, args ... }, Line: fmt.Sprintf(prefix+format, args...), }, + label: label, level: level, } } @@ -76,7 +103,9 @@ func (c *clientProto) Shutdown() { } func (c *clientProto) run() { - var batch []*logproto.Entry + var batch map[string][]*logproto.Entry //[]*jsonLogEntryLabel + batch = make(map[string][]*logproto.Entry) + // var batch []*logproto.Entry batchSize := 0 maxWait := time.NewTimer(c.config.BatchWait) @@ -98,11 +127,15 @@ func (c *clientProto) run() { } if entry.level >= c.config.SendLevel { - batch = append(batch, entry.entry) + if entry.label == "" { + entry.label = c.config.Labels + } + batch[entry.label] = append(batch[entry.label], entry.entry) + batchSize++ if batchSize >= c.config.BatchEntriesNumber { c.send(batch) - batch = []*logproto.Entry{} + batch = make(map[string][]*logproto.Entry) batchSize = 0 maxWait.Reset(c.config.BatchWait) } @@ -110,7 +143,7 @@ func (c *clientProto) run() { case <-maxWait.C: if batchSize > 0 { c.send(batch) - batch = []*logproto.Entry{} + batch = make(map[string][]*logproto.Entry) batchSize = 0 } maxWait.Reset(c.config.BatchWait) @@ -118,13 +151,15 @@ func (c *clientProto) run() { } } -func (c *clientProto) send(entries []*logproto.Entry) { +func (c *clientProto) send(entries map[string][]*logproto.Entry) { var streams []*logproto.Stream - streams = append(streams, &logproto.Stream{ - Labels: c.config.Labels, - Entries: entries, - }) + for k, v := range entries { + streams = append(streams, &logproto.Stream{ + Labels: k, + Entries: v, + }) + } req := logproto.PushRequest{ Streams: streams, }