Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
  • Loading branch information
colixxxx committed Nov 14, 2024
1 parent 0766037 commit c75162a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 206 deletions.
124 changes: 17 additions & 107 deletions cmd/telegraf/telegraf.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,13 @@ import (
"net/url"
"os"
"os/signal"
"path/filepath"
"regexp"
"strings"
"syscall"
"time"

"github.com/coreos/go-systemd/v22/daemon"
"github.com/fatih/color"
"github.com/influxdata/tail/watch"
"gopkg.in/fsnotify.v1"
"gopkg.in/tomb.v1"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -149,7 +146,6 @@ func (t *Telegraf) reloadLoop() error {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGHUP,
syscall.SIGTERM, syscall.SIGINT)

if t.watchConfig != "" {
for _, fConfig := range t.configFiles {
if isURL(fConfig) {
Expand All @@ -159,17 +155,15 @@ func (t *Telegraf) reloadLoop() error {
if _, err := os.Stat(fConfig); err != nil {
log.Printf("W! Cannot watch config %s: %s", fConfig, err)
} else {
go t.watchLocalConfig(signals, fConfig)
go t.watchLocalConfig(ctx, signals, fConfig)
}
}
// tsv: watch config dirs
for _, dir := range t.configDir {
if _, err := os.Stat(dir); err == nil {
go t.watchLocalConfigDir(signals, dir, "\\.watchman-cookie.*")
for _, fConfigDirectory := range t.configDir {
if _, err := os.Stat(fConfigDirectory); err != nil {
log.Printf("W! Cannot watch config directory %s: %s", fConfigDirectory, err)
} else {
log.Printf("W! Cannot watch config dir %s: %s", dir, err)
go t.watchLocalConfig(ctx, signals, fConfigDirectory)
}

}
}
if t.configURLWatchInterval > 0 {
Expand All @@ -183,7 +177,6 @@ func (t *Telegraf) reloadLoop() error {
go t.watchRemoteConfigs(ctx, signals, t.configURLWatchInterval, remoteConfigs)
}
}

go func() {
select {
case sig := <-signals:
Expand Down Expand Up @@ -217,67 +210,7 @@ func (t *Telegraf) reloadLoop() error {
return nil
}

// tsv: watch config directory for new files
func (t *Telegraf) watchLocalConfigDir(signals chan os.Signal, dir, exclusion string) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("E! Error watching config dir: %s\n", err)
return
}
defer watcher.Close()

re := regexp.MustCompile(exclusion)

done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
continue
}
log.Printf("I! Event watching config dir: %s\n", event)

if re.MatchString(event.Name) {
continue
}

if event.Op&fsnotify.Remove == fsnotify.Remove {
signals <- syscall.SIGHUP
return
}

if event.Op&fsnotify.Create == fsnotify.Create {
signals <- syscall.SIGHUP
return
}

case err, ok := <-watcher.Errors:
if !ok {
continue
}
log.Printf("E! Error watching config dir: %s\n", err)
}
}
}()

err = filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {

if f.IsDir() {
err := watcher.Add(path)
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Printf("E! Error watching config subdir: %s\n", err)
}
<-done
}

func (t *Telegraf) watchLocalConfig(signals chan os.Signal, fConfig string) {
func (t *Telegraf) watchLocalConfig(ctx context.Context, signals chan os.Signal, fConfig string) {
var mytomb tomb.Tomb
var watcher watch.FileWatcher
if t.watchConfig == "poll" {
Expand All @@ -291,31 +224,30 @@ func (t *Telegraf) watchLocalConfig(signals chan os.Signal, fConfig string) {
}
changes, err := watcher.ChangeEvents(&mytomb, 0)
if err != nil {
log.Printf("E! Error watching config: %s\n", err)
log.Printf("E! Error watching config file/directory %q: %s\n", fConfig, err)
return
}
log.Printf("I! Config watcher started for %s\n", fConfig)
select {
case <-ctx.Done():
mytomb.Done()
return
case <-changes.Modified:
log.Println("I! Config file modified")
log.Printf("I! Config file/directory %q modified\n", fConfig)
case <-changes.Deleted:
// deleted can mean moved. wait a bit a check existence
<-time.After(time.Second)
if _, err := os.Stat(fConfig); err == nil {
log.Printf("I! Config file overwritten %s\n", fConfig)
log.Printf("I! Config file/directory %q overwritten\n", fConfig)
} else {
log.Printf("W! Config file deleted %s\n", fConfig)
// tsv: issue here if several renames happended
/*if err := watcher.BlockUntilExists(&mytomb); err != nil {
log.Printf("E! Cannot watch for config: %s\n", err.Error())
return
}
log.Println("I! Config file appeared")*/
log.Printf("W! Config file/directory %q deleted\n", fConfig)
}
case <-changes.Truncated:
log.Println("I! Config file truncated")
log.Printf("I! Config file/directory %q truncated\n", fConfig)
case <-changes.Created:
log.Printf("I! Config directory %q has new file(s)\n", fConfig)
case <-mytomb.Dying():
log.Println("I! Config watcher ended")
log.Printf("I! Config watcher %q ended\n", fConfig)
return
}
mytomb.Done()
Expand All @@ -338,35 +270,13 @@ func (t *Telegraf) watchRemoteConfigs(ctx context.Context, signals chan os.Signa
return
case <-ticker.C:
for _, configURL := range remoteConfigs {
<<<<<<< HEAD
resp, err := http.Head(configURL) //nolint:gosec // user provided URL
=======
u, err := url.Parse(configURL)
if err != nil {
log.Printf("W! Error parsing config URL, %s: %s\n", configURL, err)
continue
}
rawQuery, err := config.AddHostParams(u)
if err != nil {
log.Printf("W! Error adding params to config URL, %s: %s\n", configURL, err)
continue
}
u.RawQuery = rawQuery
URLparam := u.String()

resp, err := http.Head(URLparam) //nolint: gosec // user provided URL
>>>>>>> 5886
if err != nil {
log.Printf("W! Error fetching config URL, %s: %s\n", configURL, err)
continue
}
resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Printf("E! Failed to fetch HTTP config: %s", resp.Status)
continue
}

modified := resp.Header.Get("Last-Modified")
if modified == "" {
log.Printf("E! Last-Modified header not found, stopping the watcher for %s\n", configURL)
Expand Down
43 changes: 20 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/influxdata/telegraf

go 1.23

toolchain go1.23.0
go 1.23.0

require (
cloud.google.com/go/bigquery v1.63.1
Expand Down Expand Up @@ -202,6 +200,7 @@ require (
github.com/tinylib/msgp v1.2.0
github.com/urfave/cli/v2 v2.27.2
github.com/vapourismo/knx-go v0.0.0-20240217175130-922a0d50c241
github.com/vertica/vertica-sql-go v1.3.3
github.com/vishvananda/netlink v1.3.0
github.com/vishvananda/netns v0.0.4
github.com/vjeantet/grok v1.0.1
Expand All @@ -217,14 +216,14 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1
go.starlark.net v0.0.0-20240725214946-42030a7cedce
go.step.sm/crypto v0.51.1
golang.org/x/crypto v0.26.0
golang.org/x/crypto v0.28.0
golang.org/x/mod v0.20.0
golang.org/x/net v0.28.0
golang.org/x/oauth2 v0.22.0
golang.org/x/net v0.30.0
golang.org/x/oauth2 v0.23.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.24.0
golang.org/x/term v0.23.0
golang.org/x/text v0.17.0
golang.org/x/sys v0.26.0
golang.org/x/term v0.25.0
golang.org/x/text v0.19.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211230205640-daad0b7ba671
gonum.org/v1/gonum v0.15.1
google.golang.org/api v0.203.0
Expand Down Expand Up @@ -252,8 +251,7 @@ require (
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/muhlemmer/gu v0.3.1 // indirect
github.com/rainycape/unidecode v0.0.0-20150907023854-cb7f23ec59be // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/vertica/vertica-sql-go v1.3.3 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/zitadel/logging v0.6.0 // indirect
github.com/zitadel/oidc/v3 v3.26.1 // indirect
github.com/zitadel/schema v1.3.0 // indirect
Expand Down Expand Up @@ -352,7 +350,6 @@ require (
github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-git/go-billy/v5 v5.5.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.20.2 // indirect
Expand Down Expand Up @@ -503,23 +500,23 @@ require (
go.etcd.io/etcd/api/v3 v3.5.4 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/collector/consumer v0.101.0 // indirect
go.opentelemetry.io/collector/semconv v0.101.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.52.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/sdk v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.opentelemetry.io/collector/semconv v0.105.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/time v0.7.0 // indirect
golang.org/x/tools v0.24.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434 // indirect
google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240808171019-573a1156607a // indirect
google.golang.org/genproto v0.0.0-20241015192408-796eee8c2d53 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241015192408-796eee8c2d53 // indirect
gopkg.in/fatih/pool.v2 v2.0.0 // indirect
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
Loading

0 comments on commit c75162a

Please sign in to comment.