From 1fb65f3aa279385e74b7a46aea6785fa86875396 Mon Sep 17 00:00:00 2001 From: chenshangmin Date: Fri, 14 Jan 2022 13:16:38 +0800 Subject: [PATCH] =?UTF-8?q?[PDR-16012][feat]logkit=E5=8F=91=E9=80=81?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=BB=B6=E8=BF=9F=EF=BC=8C=E5=86=85=E9=83=A8?= =?UTF-8?q?queue=E9=95=BF=E5=BA=A6=E6=8C=87=E6=A0=87=E5=BC=80=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- mgr/metric_runner.go | 15 +++-- mgr/runner.go | 16 +++-- reader/meta.go | 2 +- reader/meta_test.go | 2 +- sender/fault_tolerant.go | 131 ++++++++++++++++++++++++++++++++++++--- utils/models/models.go | 2 + 6 files changed, 148 insertions(+), 20 deletions(-) diff --git a/mgr/metric_runner.go b/mgr/metric_runner.go index 50c46bc58..b8b7b6b3d 100644 --- a/mgr/metric_runner.go +++ b/mgr/metric_runner.go @@ -295,7 +295,7 @@ func (r *MetricRunner) Run() { dataCnt := 0 datas := make([]Data, 0) metricTime := time.Now() - tags[metric.Timestamp] = metricTime.Format(time.RFC3339Nano) + tags[metric.Timestamp] = metricTime.UnixNano()/1e6 for _, c := range r.collectors { metricName := c.Name() tmpdatas, err := c.Collect() @@ -610,10 +610,14 @@ func (mr *MetricRunner) StatusRestore() { } sStatus, ok := s.(sender.StatsSender) if ok { - sStatus.Restore(&StatsInfo{ + statsInfo:=&StatsInfo{ Success: info[0], Errors: info[1], - }) + } + if len(info)>2{ + statsInfo.FtSendLag=info[2] + } + sStatus.Restore(statsInfo) } status, ext := mr.rs.SenderStats[name] if !ext { @@ -635,7 +639,7 @@ func (mr *MetricRunner) StatusBackup() { status.ParserStats.Success, status.ParserStats.Errors, }, - SenderCnt: map[string][2]int64{}, + SenderCnt: map[string][]int64{}, } for _, s := range mr.senders { name := s.Name() @@ -646,9 +650,10 @@ func (mr *MetricRunner) StatusBackup() { status.SenderStats[name] = senderStats } if sta, exist := status.SenderStats[name]; exist { - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } } diff --git a/mgr/runner.go b/mgr/runner.go index e850234ce..90b3d649d 100644 --- a/mgr/runner.go +++ b/mgr/runner.go @@ -1476,10 +1476,14 @@ func (r *LogExportRunner) StatusRestore() { } sStatus, ok := s.(sender.StatsSender) if ok { - sStatus.Restore(&StatsInfo{ + statsInfo:=&StatsInfo{ Success: info[0], Errors: info[1], - }) + } + if len(info)>2{ + statsInfo.FtSendLag=info[2] + } + sStatus.Restore(statsInfo) } status, ext := r.rs.SenderStats[name] if !ext { @@ -1519,7 +1523,7 @@ func (r *LogExportRunner) StatusBackup() { status.ParserStats.Errors, }, TransCnt: map[string][2]int64{}, - SenderCnt: map[string][2]int64{}, + SenderCnt: map[string][]int64{}, } r.historyMutex.Lock() defer r.historyMutex.Unlock() @@ -1535,9 +1539,10 @@ func (r *LogExportRunner) StatusBackup() { for idx, t := range r.transformers { name := formatTransformName(t.Type(), idx) sta := t.Stats() - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } @@ -1563,9 +1568,10 @@ func (r *LogExportRunner) StatusBackup() { status.SenderStats[name] = senderStats } if sta, exist := status.SenderStats[name]; exist { - bStart.SenderCnt[name] = [2]int64{ + bStart.SenderCnt[name] = []int64{ sta.Success, sta.Errors, + sta.FtSendLag, } } } diff --git a/reader/meta.go b/reader/meta.go index f1b84e6be..308cc9d93 100644 --- a/reader/meta.go +++ b/reader/meta.go @@ -48,7 +48,7 @@ const ( type Statistic struct { ReaderCnt int64 `json:"reader_count"` // 读取总条数 ParserCnt [2]int64 `json:"parser_connt"` // [解析成功, 解析失败] - SenderCnt map[string][2]int64 `json:"sender_count"` // [发送成功, 发送失败] + SenderCnt map[string][]int64 `json:"sender_count"` // [发送成功, 发送失败] TransCnt map[string][2]int64 `json:"transform_count"` // [解析成功, 解析失败] ReadErrors ErrorStatistic `json:"read_errors"` ParseErrors ErrorStatistic `json:"parse_errors"` diff --git a/reader/meta_test.go b/reader/meta_test.go index 195557364..0a0e91e09 100644 --- a/reader/meta_test.go +++ b/reader/meta_test.go @@ -109,7 +109,7 @@ func TestMeta(t *testing.T) { stat := &Statistic{ ReaderCnt: 6, ParserCnt: [2]int64{6, 8}, - SenderCnt: map[string][2]int64{ + SenderCnt: map[string][]int64{ "aaa": {1, 2}, "bbb": {5, 6}, }, diff --git a/sender/fault_tolerant.go b/sender/fault_tolerant.go index d53aa7470..15ed72e2c 100644 --- a/sender/fault_tolerant.go +++ b/sender/fault_tolerant.go @@ -4,8 +4,10 @@ import ( "encoding/json" "errors" "fmt" + "io/ioutil" "math" "os" + "path/filepath" "strconv" "strings" "sync" @@ -32,6 +34,12 @@ const ( defaultMaxProcs = 1 // 默认没有并发 // TypeMarshalError 表示marshal出错 TypeMarshalError = reqerr.SendErrorType("Data Marshal failed") + // KeyUnMarshalError + KeyUnMarshalError = "Data unmarshal failed" + // NumUnMarshalError + NumUnMarshalError = 10 + // lag file + LagFilename = "meta.lag" ) var _ SkipDeepCopySender = &FtSender{} @@ -202,6 +210,9 @@ func newFtSender(innerSender Sender, runnerName string, opt *FtOption) (*FtSende isBlock: opt.isBlock, backoff: utils.NewBackoff(2, 1, 1*time.Second, 5*time.Minute), } + ftSender.statsMutex.Lock() + ftSender.stats.FtSendLag = ftSender.readLag() + ftSender.statsMutex.Unlock() if opt.innerSenderType == TypePandora { ftSender.pandoraKeyCache = make(map[string]KeyInfo) @@ -269,9 +280,17 @@ func (ft *FtSender) RawSend(datas []string) error { } else { // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag se.AddSuccessNum(len(datas)) + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag + int64(len(datas)) + ft.statsMutex.Unlock() ft.backoff.Reset() } se.FtQueueLag = ft.BackupQueue.Depth() + ft.logQueue.Depth() + if se.FtQueueLag == 0 { + ft.statsMutex.Lock() + ft.stats.FtSendLag = 0 + ft.statsMutex.Unlock() + } } return se } @@ -350,9 +369,17 @@ func (ft *FtSender) Send(datas []Data) error { } else { // se 中的 lasterror 和 senderror 都为空,需要使用 se.FtQueueLag se.AddSuccessNum(len(datas)) + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag + int64(len(datas)) + ft.statsMutex.Unlock() ft.backoff.Reset() } se.FtQueueLag = ft.BackupQueue.Depth() + ft.logQueue.Depth() + if se.FtQueueLag == 0 { + ft.statsMutex.Lock() + ft.stats.FtSendLag = 0 + ft.statsMutex.Unlock() + } return se } @@ -391,6 +418,9 @@ func (ft *FtSender) Close() error { // persist queue's meta data ft.logQueue.Close() ft.BackupQueue.Close() + ft.statsMutex.Lock() + ft.writeLag(ft.stats.FtSendLag) + ft.statsMutex.Unlock() return ft.innerSender.Close() } @@ -477,6 +507,9 @@ func (ft *FtSender) saveToFile(datas []Data) error { } func (ft *FtSender) asyncSendLogFromQueue() { + // if not sleep, queue lag may be cleared + time.Sleep(time.Second * 10) + for i := 0; i < ft.procs; i++ { if ft.opt.sendRaw { readLinesChan := make(<-chan []string) @@ -502,18 +535,31 @@ func (ft *FtSender) asyncSendLogFromQueue() { } // trySend 从bytes反序列化数据后尝试发送数据 -func (ft *FtSender) trySendBytes(dat []byte, failSleep int, isRetry bool) (backDataContext []*datasContext, err error) { +func (ft *FtSender) trySendBytes(dat []byte, failSleep int, isRetry bool, isFromQueue bool) (backDataContext []*datasContext, err error) { if ft.opt.sendRaw { datas, err := ft.unmarshalRaws(dat) if err != nil { - return nil, err + return nil, errors.New(KeyUnMarshalError + ":" + err.Error()) } + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() + return ft.backOffSendRawFromQueue(datas, failSleep, isRetry) } datas, err := ft.unmarshalData(dat) if err != nil { - return nil, err + return nil, errors.New(KeyUnMarshalError + ":" + err.Error()) + } + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 } + ft.statsMutex.Unlock() return ft.backOffSendFromQueue(datas, failSleep, isRetry) } @@ -562,6 +608,9 @@ func (ft *FtSender) trySendRaws(datas []string, failSleep int, isRetry bool) (ba log.Errorf("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d", ft.runnerName, ft.innerSender.Name(), ft.BackupQueue.Name(), err, len(datas)) return nil, nil } + ft.statsMutex.Lock() + ft.stats.FtSendLag += int64(len(v.Lines)) + ft.statsMutex.Unlock() } time.Sleep(time.Second * time.Duration(math.Pow(2, float64(failSleep)))) @@ -616,6 +665,9 @@ func (ft *FtSender) trySendDatas(datas []Data, failSleep int, isRetry bool) (bac log.Errorf("Runner[%v] Sender[%v] cannot write points back to queue %v: %v, discard datas %d", ft.runnerName, ft.innerSender.Name(), ft.BackupQueue.Name(), err, len(datas)) return nil, nil } + ft.statsMutex.Lock() + ft.stats.FtSendLag += int64(len(v.Datas)) + ft.statsMutex.Unlock() } time.Sleep(time.Second * time.Duration(math.Pow(2, float64(failSleep)))) @@ -876,6 +928,7 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r timer := time.NewTicker(time.Second) defer timer.Stop() numWaits := 1 + unmarshalDataError := 0 var curDataContext, otherDataContext []*datasContext var curIdx int var backDataContext []*datasContext @@ -891,8 +944,14 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r } else { select { case bytes := <-readChan: - backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry) + backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry, true) case datas := <-readDatasChan: + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() backDataContext, err = ft.backOffSendRawFromQueue(datas, numWaits, isRetry) case <-timer.C: continue @@ -908,6 +967,15 @@ func (ft *FtSender) sendRawFromQueue(queueName string, readChan <-chan []byte, r if numWaits > 5 { numWaits = 5 } + if strings.HasPrefix(err.Error(), KeyUnMarshalError) { + unmarshalDataError++ + if unmarshalDataError > NumUnMarshalError { + time.Sleep(time.Second) + log.Errorf("Runner[%s] Sender[%s] sleep 1s due to unmarshal err", ft.runnerName, ft.innerSender.Name(), queueName, err) + } + } else { + unmarshalDataError = 0 + } } if backDataContext != nil { otherDataContext = append(otherDataContext, backDataContext...) @@ -924,6 +992,7 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read timer := time.NewTicker(time.Second) defer timer.Stop() numWaits := 1 + unmarshalDataError := 0 var curDataContext, otherDataContext []*datasContext var curIdx int var backDataContext []*datasContext @@ -939,8 +1008,14 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read } else { select { case bytes := <-readChan: - backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry) + backDataContext, err = ft.trySendBytes(bytes, numWaits, isRetry, true) case datas := <-readDatasChan: + ft.statsMutex.Lock() + ft.stats.FtSendLag = ft.stats.FtSendLag - int64(len(datas)) + if ft.stats.FtSendLag < 0 { + ft.stats.FtSendLag = 0 + } + ft.statsMutex.Unlock() backDataContext, err = ft.backOffSendFromQueue(datas, numWaits, isRetry) case <-timer.C: continue @@ -956,6 +1031,15 @@ func (ft *FtSender) sendFromQueue(queueName string, readChan <-chan []byte, read if numWaits > 5 { numWaits = 5 } + if strings.HasPrefix(err.Error(), KeyUnMarshalError) { + unmarshalDataError++ + if unmarshalDataError > NumUnMarshalError { + time.Sleep(time.Second) + log.Errorf("Runner[%s] Sender[%s] sleep 1s due to unmarshal err", ft.runnerName, ft.innerSender.Name(), queueName, err) + } + } else { + unmarshalDataError = 0 + } } if backDataContext != nil { otherDataContext = append(otherDataContext, backDataContext...) @@ -993,8 +1077,8 @@ func SplitData(data string) (valArray []string) { valArray = SplitDataWithSplitSize(valArray, data[start:offset], DefaultSplitSize) if len(valArray) > 0 { // 最后一个分片参与下次split - start = offset - len(valArray[len(valArray) - 1]) - valArray = valArray[:len(valArray) - 1] + start = offset - len(valArray[len(valArray)-1]) + valArray = valArray[:len(valArray)-1] } continue } @@ -1017,7 +1101,7 @@ func SplitDataWithSplitSize(originArray []string, data string, splitSize int64) if len(originArray) != 0 { num := (DefaultMaxBatchSize - int64(len(originArray[len(originArray)-1]))) / splitSize if num > 0 { - end := num*splitSize + end := num * splitSize if end > int64(len(data)) { end = int64(len(data)) } @@ -1200,3 +1284,34 @@ func (ft *FtSender) backOffReTrySendRaw(lines []string, isRetry bool) (res []*da time.Sleep(backoff.Duration()) } } + +// readLag read lag from file +func (ft *FtSender) readLag() int64 { + path := filepath.Join(ft.opt.saveLogPath, LagFilename) + f, err := ioutil.ReadFile(path) + if err != nil { + log.Errorf("Runner[%v] Sender[%v] read file error : %v", ft.runnerName, ft.innerSender.Name(), err) + return 0 + } + lag, err := strconv.ParseInt(string(f), 10, 64) + if err != nil { + log.Errorf("Runner[%v] Sender[%v] parse lag error : %v", ft.runnerName, ft.innerSender.Name(), err) + } + return lag +} + +// writeLag write lag into file +func (ft *FtSender) writeLag(lag int64) error { + path := filepath.Join(ft.opt.saveLogPath, LagFilename) + file, err := os.OpenFile(path, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0666) + defer func() { + file.Sync() + file.Close() + }() + if err != nil { + return err + } + lagStr := strconv.FormatInt(lag, 10) + _, err = file.WriteString(lagStr) + return err +} diff --git a/utils/models/models.go b/utils/models/models.go index e839760bd..843ddf0f7 100644 --- a/utils/models/models.go +++ b/utils/models/models.go @@ -186,6 +186,7 @@ type LagInfo struct { Size int64 `json:"size"` SizeUnit string `json:"sizeunit"` Ftlags int64 `json:"ftlags"` + FtSendLags int64 `json:"ft_send_lags"` Total int64 `json:"total"` } @@ -205,6 +206,7 @@ type StatsInfo struct { Trend string `json:"trend"` LastError string `json:"last_error"` FtQueueLag int64 `json:"-"` + FtSendLag int64 `json:"ft_send_lag"` } type ErrorStatistic struct {