Skip to content

Commit

Permalink
Merge pull request #109 from TencentBlueKing/feat/diff_check_reload
Browse files Browse the repository at this point in the history
feat: 支持主配置变更重加载
  • Loading branch information
chenjiandongx authored Nov 18, 2024
2 parents 22b0bfd + c4ae351 commit 7b8cede
Show file tree
Hide file tree
Showing 36 changed files with 166 additions and 75 deletions.
77 changes: 73 additions & 4 deletions beater/beater.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,28 @@ package beater

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/output/gse"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/host"
"github.com/elastic/beats/libbeat/cfgfile"
"github.com/elastic/beats/libbeat/common"
"github.com/pkg/errors"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
// 加载 Filebeat Input插件及配置优化模块
_ "github.com/TencentBlueKing/bkunifylogbeat/include"
_ "github.com/TencentBlueKing/bkunifylogbeat/include" // 加载 Filebeat Input插件及配置优化模块
"github.com/TencentBlueKing/bkunifylogbeat/registrar"
"github.com/pkg/errors"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
)

var Registrar *registrar.Registrar

const beatName = "bkunifylogbeat"

// LogBeat package cadvisor
type LogBeat struct {
Name string
Expand All @@ -53,7 +59,8 @@ type LogBeat struct {

hostIDWatcher host.Watcher

isReload bool
isReload bool
lastTaskHash string
}

// New create cadvisor beat
Expand Down Expand Up @@ -118,6 +125,8 @@ func (bt *LogBeat) Run() error {
}

reloadTicker := time.NewTicker(10 * time.Second)
diffTaskTicker := time.NewTicker(60 * time.Second)
defer diffTaskTicker.Stop()
defer reloadTicker.Stop()
for {
select {
Expand All @@ -126,10 +135,23 @@ func (bt *LogBeat) Run() error {
if bt.isReload {
bt.isReload = false
config := beat.GetConfig()
if bt.config.CheckDiff {
config, err = GetRawConfig()
if err != nil {
logp.L.Error(err)
}
}
if config != nil {
bt.Reload(config)
}
}
// 处理采集器主配置是否变更,变更则发送重加载信号
case <-diffTaskTicker.C:
if bt.config.CheckDiff {
if err = bt.checkDiffReload(); err != nil {
logp.L.Error(err)
}
}
case <-beat.ReloadChan:
bt.isReload = true
// 处理采集器框架发送的结束采集器的信号(常由SIGINT引起),关闭采集器
Expand All @@ -148,6 +170,53 @@ func (bt *LogBeat) Stop() {
close(bt.done)
}

// GetRawConfig Get raw main config
func GetRawConfig() (*common.Config, error) {
rawConfig, err := cfgfile.Load("", nil)
if err != nil {
return nil, err
}

if !rawConfig.HasField(beatName) {
return nil, errors.New("no beat name field found")
}

beatConfig, err := rawConfig.Child(beatName, -1)
if err != nil {
return nil, err
}

return beatConfig, nil
}

// Main config diff check
func (bt *LogBeat) checkDiffReload() error {
beatConfig, err := GetRawConfig()
if err != nil {
return err
}
config, err := cfg.Parse(beatConfig)
if err != nil {
return err
}
b, err := json.Marshal(config)
if err != nil {
return err
}

currentTaskHash := utils.Md5(string(b))
if len(bt.lastTaskHash) == 0 {
bt.lastTaskHash = currentTaskHash
}
if bt.lastTaskHash != currentTaskHash {
bt.lastTaskHash = currentTaskHash
bt.Reload(beatConfig)
logp.L.Info("Reload main config task.")
}

return nil
}

// Close cadvisor storage interface
func (bt *LogBeat) Close() error {
return nil
Expand Down
5 changes: 3 additions & 2 deletions beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ import (

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
bkmonitoring "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/monitoring"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/monitoring"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/registrar"
"github.com/TencentBlueKing/bkunifylogbeat/task"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/libbeat/monitoring"
)

var (
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
CmdbLevelMaxLength int `config:"cmdb_level_max_length"`
IgnoreCmdbLevel bool `config:"ignore_cmdb_level"`
MustHostIDExist bool `config:"must_host_id_exist"`
CheckDiff bool `config:"check_diff"`
}

// 从配置目录
Expand Down
3 changes: 2 additions & 1 deletion config/input/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
"time"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/dustin/go-humanize"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
)

// 如果未配置close_inactive则直接默认为5分钟
Expand Down
3 changes: 2 additions & 1 deletion config/input/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (
"testing"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/elastic/beats/libbeat/common"
libbeatlogp "github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
)

func init() {
Expand Down
1 change: 1 addition & 0 deletions config/input/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package input

import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
)

Expand Down
6 changes: 4 additions & 2 deletions config/input/wineventlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
package input

import (
"time"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/elastic/beats/libbeat/common"
"time"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
)

// WinEventLogConfig window event log collect config
Expand Down
3 changes: 2 additions & 1 deletion config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"

"github.com/TencentBlueKing/bkunifylogbeat/utils"
)

// ConditionConfig : 用于条件表达式,目前支持=、!=、eq、neq、include、exclude、regex、nregex
Expand Down
13 changes: 1 addition & 12 deletions include/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,14 @@
package include

import (
// input type : kafka
_ "github.com/elastic/beats/filebeat/input/kafka"
// input type : log
_ "github.com/elastic/beats/filebeat/input/log"
// input type : redis
_ "github.com/elastic/beats/filebeat/input/redis"
// input type : stdin
_ "github.com/elastic/beats/filebeat/input/stdin"
// input type : syslog
_ "github.com/elastic/beats/filebeat/input/syslog"
// input type : udp
_ "github.com/elastic/beats/filebeat/input/udp"

// window event log
_ "github.com/TencentBlueKing/bkunifylogbeat/task/input/wineventlog"

// input config
_ "github.com/TencentBlueKing/bkunifylogbeat/config/input"

// formatter
_ "github.com/TencentBlueKing/bkunifylogbeat/task/formatter"
_ "github.com/TencentBlueKing/bkunifylogbeat/task/input/wineventlog"
)
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"os"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkunifylogbeat/beater"
_ "github.com/TencentBlueKing/bkunifylogbeat/json"
"github.com/elastic/beats/libbeat/cmd/instance"
"github.com/elastic/beats/libbeat/publisher/processing"

"github.com/TencentBlueKing/bkunifylogbeat/beater"
_ "github.com/TencentBlueKing/bkunifylogbeat/json"
)

var (
Expand Down
5 changes: 3 additions & 2 deletions registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
bkmonitoring "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/monitoring"
bkStorage "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/storage"
cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/task/input/wineventlog"
"github.com/elastic/beats/filebeat/input/file"
commonFile "github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/monitoring"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/task/input/wineventlog"
)

const (
Expand Down
5 changes: 3 additions & 2 deletions registrar/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ import (

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
bkStorage "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/storage"
cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/tests"
"github.com/elastic/beats/filebeat/input/file"
libbeatlogp "github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/tests"
)

func init() {
Expand Down
3 changes: 2 additions & 1 deletion task/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ package base

import (
"fmt"
"sync"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
bkmonitoring "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/monitoring"
"github.com/elastic/beats/libbeat/monitoring"
"sync"
)

var (
Expand Down
3 changes: 2 additions & 1 deletion task/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
bkmonitoring "github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/monitoring"
"github.com/elastic/beats/filebeat/util"

"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/task/base"
"github.com/TencentBlueKing/bkunifylogbeat/task/processor"
"github.com/elastic/beats/filebeat/util"
)

var (
Expand Down
5 changes: 3 additions & 2 deletions task/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
package filter

import (
"testing"
"time"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/logp"
libbeatlogp "github.com/elastic/beats/libbeat/logp"
"testing"
"time"

cfg "github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/tests"
Expand Down
5 changes: 3 additions & 2 deletions task/formatter/formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ package formatter

import (
"fmt"
"github.com/golang/groupcache/lru"
"path/filepath"
"strconv"
"strings"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/elastic/beats/filebeat/util"
"github.com/golang/groupcache/lru"

"github.com/TencentBlueKing/bkunifylogbeat/config"
)

// Formatter 采集器事件包格式化接口, 根据任务配置返回相应的格式
Expand Down
5 changes: 3 additions & 2 deletions task/formatter/tqos.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"fmt"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
"github.com/elastic/beats/filebeat/util"
"github.com/golang/groupcache/lru"

"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
)

// TQOSLogConfig 如果未配置close_inactive则直接默认为5分钟
Expand Down
3 changes: 2 additions & 1 deletion task/formatter/tqos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
package formatter

import (
"github.com/TencentBlueKing/bkunifylogbeat/config"
"testing"

"github.com/TencentBlueKing/bkunifylogbeat/config"
)

func TestNewTQOSFormatter(t *testing.T) {
Expand Down
5 changes: 3 additions & 2 deletions task/formatter/unifytlogc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ import (
"fmt"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/libgse/beat"
"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
"github.com/elastic/beats/filebeat/util"
"github.com/golang/groupcache/lru"

"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/TencentBlueKing/bkunifylogbeat/utils"
)

// LogConfig 如果未配置close_inactive则直接默认为5分钟
Expand Down
3 changes: 2 additions & 1 deletion task/formatter/unifytlogc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"testing"
"time"

"github.com/TencentBlueKing/bkunifylogbeat/config"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/stretchr/testify/assert"

"github.com/TencentBlueKing/bkunifylogbeat/config"
)

func BaseFormatter(t *testing.T, taskConfig *config.TaskConfig) {
Expand Down
Loading

0 comments on commit 7b8cede

Please sign in to comment.