Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

override log shipping level on start up #1467

Merged
merged 6 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul
}))
}

// Run launcher in debug mode for first 10 minutes. Intentionally doing this after setting up
// stderr debug logger becaues we don't want to write evertyhing to stderr.
k.SetLogShippingLevelOverride("debug", 10*time.Minute)

// Need to set up the log shipper so that we can get the logger early
// and pass it to the various systems.
var logShipper *logshipper.LogShipper
Expand Down Expand Up @@ -357,7 +361,6 @@ func runLauncher(ctx context.Context, cancel func(), slogger, systemSlogger *mul
if logShipper != nil {
runGroup.Add("logShipper", logShipper.Run, logShipper.Stop)
controlService.RegisterSubscriber(authTokensSubsystemName, logShipper)
controlService.RegisterSubscriber(agentFlagsSubsystemName, logShipper)
}

if metadataWriter := internal.NewMetadataWriter(logger, k); metadataWriter == nil {
Expand Down
93 changes: 57 additions & 36 deletions pkg/agent/flags/flag_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@ import (
// FlagController is responsible for retrieving flag values from the appropriate sources,
// determining precedence, sanitizing flag values, and notifying observers of changes.
type FlagController struct {
logger log.Logger
cmdLineOpts *launcher.Options
agentFlagsStore types.KVStore
overrideMutex sync.RWMutex
controlRequestOverride FlagValueOverride
observers map[types.FlagsChangeObserver][]keys.FlagKey
observersMutex sync.RWMutex
logger log.Logger
cmdLineOpts *launcher.Options
agentFlagsStore types.KVStore
overrideMutex sync.RWMutex
overrides map[keys.FlagKey]*Override
observers map[types.FlagsChangeObserver][]keys.FlagKey
observersMutex sync.RWMutex
}

func NewFlagController(logger log.Logger, agentFlagsStore types.KVStore, opts ...Option) *FlagController {
fc := &FlagController{
logger: logger,
logger: log.With(logger, "component", "flag_controller"),
cmdLineOpts: &launcher.Options{},
agentFlagsStore: agentFlagsStore,
observers: make(map[types.FlagsChangeObserver][]keys.FlagKey),
overrides: make(map[keys.FlagKey]*Override),
}

for _, opt := range opts {
Expand Down Expand Up @@ -114,6 +115,44 @@ func (fc *FlagController) notifyObservers(flagKeys ...keys.FlagKey) {
}
}

func (fc *FlagController) overrideFlag(key keys.FlagKey, duration time.Duration, value any) {
// Always notify observers when overrides start, so they know to refresh.
// Defering this before defering unlocking the mutex so that notifications occur outside of the critical section.
defer fc.notifyObservers(key)

fc.overrideMutex.Lock()
defer fc.overrideMutex.Unlock()

level.Info(fc.logger).Log(
"msg", "overriding flag",
"key", key,
"value", value,
"duration", duration,
)

override, ok := fc.overrides[key]
if !ok || override.Value() == nil {
// Creating the override implicitly causes future flag value retrievals to use the override until expiration
override = &Override{}
fc.overrides[key] = override
}

overrideExpired := func(key keys.FlagKey) {
// Always notify observers when overrides expire, so they know to refresh.
// Defering this before defering unlocking the mutex so that notifications occur outside of the critical section.
defer fc.notifyObservers(key)

fc.overrideMutex.Lock()
defer fc.overrideMutex.Unlock()

// Deleting the override implictly allows the next value to take precedence
delete(fc.overrides, key)
}

// Start a new override, or re-start an existing one with a new value, duration, and expiration
fc.overrides[key].Start(key, value, duration, overrideExpired)
}

func (fc *FlagController) AutoloadedExtensions() []string {
return fc.cmdLineOpts.AutoloadedExtensions
}
Expand Down Expand Up @@ -250,40 +289,15 @@ func (fc *FlagController) ControlServerURL() string {
func (fc *FlagController) SetControlRequestInterval(interval time.Duration) error {
return fc.setControlServerValue(keys.ControlRequestInterval, durationToBytes(interval))
}
func (fc *FlagController) SetControlRequestIntervalOverride(interval, duration time.Duration) {
// Always notify observers when overrides start, so they know to refresh.
// Defering this before defering unlocking the mutex so that notifications occur outside of the critical section.
defer fc.notifyObservers(keys.ControlRequestInterval)

fc.overrideMutex.Lock()
defer fc.overrideMutex.Unlock()

if fc.controlRequestOverride == nil || fc.controlRequestOverride.Value() == nil {
// Creating the override implicitly causes future ControlRequestInterval retrievals to use the override until expiration
fc.controlRequestOverride = &Override{}
}

overrideExpired := func(key keys.FlagKey) {
// Always notify observers when overrides expire, so they know to refresh.
// Defering this before defering unlocking the mutex so that notifications occur outside of the critical section.
defer fc.notifyObservers(key)

fc.overrideMutex.Lock()
defer fc.overrideMutex.Unlock()

// Deleting the override implictly allows the next value to take precedence
fc.controlRequestOverride = nil
}

// Start a new override, or re-start an existing one with a new value, duration, and expiration
fc.controlRequestOverride.Start(keys.ControlRequestInterval, interval, duration, overrideExpired)
func (fc *FlagController) SetControlRequestIntervalOverride(value time.Duration, duration time.Duration) {
fc.overrideFlag(keys.ControlRequestInterval, duration, value)
}
func (fc *FlagController) ControlRequestInterval() time.Duration {
fc.overrideMutex.RLock()
defer fc.overrideMutex.RUnlock()

return NewDurationFlagValue(fc.logger, keys.ControlRequestInterval,
WithOverride(fc.controlRequestOverride),
WithOverride(fc.overrides[keys.ControlRequestInterval]),
WithDefault(fc.cmdLineOpts.ControlRequestInterval),
WithMin(5*time.Second),
WithMax(10*time.Minute),
Expand Down Expand Up @@ -490,10 +504,17 @@ func (fc *FlagController) LogIngestServerURL() string {
func (fc *FlagController) SetLogShippingLevel(level string) error {
return fc.setControlServerValue(keys.LogShippingLevel, []byte(level))
}
func (fc *FlagController) SetLogShippingLevelOverride(value string, duration time.Duration) {
fc.overrideFlag(keys.LogShippingLevel, duration, value)
}
func (fc *FlagController) LogShippingLevel() string {
fc.overrideMutex.RLock()
defer fc.overrideMutex.RUnlock()

const defaultLevel = "info"

return NewStringFlagValue(
WithOverrideString(fc.overrides[keys.LogShippingLevel]),
WithDefaultString(defaultLevel),
WithSanitizer(func(value string) string {
value = strings.ToLower(value)
Expand Down
16 changes: 16 additions & 0 deletions pkg/agent/flags/flag_value_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package flags

type stringFlagValueOption func(*stringFlagValue)

func WithOverrideString(override FlagValueOverride) stringFlagValueOption {
return func(s *stringFlagValue) {
s.override = override
}
}

func WithDefaultString(defaultVal string) stringFlagValueOption {
return func(s *stringFlagValue) {
s.defaultVal = defaultVal
Expand All @@ -17,6 +23,7 @@ func WithSanitizer(sanitizer func(value string) string) stringFlagValueOption {
type stringFlagValue struct {
defaultVal string
sanitizer func(value string) string
override FlagValueOverride
}

func NewStringFlagValue(opts ...stringFlagValueOption) *stringFlagValue {
Expand All @@ -35,9 +42,18 @@ func (s *stringFlagValue) get(controlServerValue []byte) string {
stringValue = string(controlServerValue)
}

if s.override != nil && s.override.Value() != nil {
// An override was provided, if it's valid let it take precedence
value, ok := s.override.Value().(string)
if ok {
stringValue = value
}
}

// Run the string through a sanitizer, if one was provided
if s.sanitizer != nil {
stringValue = s.sanitizer(stringValue)
}

return stringValue
}
11 changes: 9 additions & 2 deletions pkg/agent/flags/flag_value_string_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package flags
import (
"testing"

"github.com/kolide/launcher/pkg/agent/flags/mocks"
"github.com/stretchr/testify/assert"
)

func TestFlagValueString(t *testing.T) {
t.Parallel()

// mockOverride := mocks.NewFlagValueOverride(t)
// mockOverride.On("Value").Return(7 * time.Second)
mockOverride := mocks.NewFlagValueOverride(t)
mockOverride.On("Value").Return("override_value")

tests := []struct {
name string
Expand Down Expand Up @@ -52,6 +53,12 @@ func TestFlagValueString(t *testing.T) {
controlServerValue: []byte("control-server-says-this"),
expected: "SANITIZED control-server-says-this",
},
{
name: "control server with override",
options: []stringFlagValueOption{WithDefaultString("default_value"), WithOverrideString(mockOverride)},
controlServerValue: []byte("enabled"),
expected: mockOverride.Value().(string),
},
}
for _, tt := range tests {
tt := tt
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/knapsack/knapsack.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func (k *knapsack) LogIngestServerURL() string {
func (k *knapsack) SetLogShippingLevel(level string) error {
return k.flags.SetLogShippingLevel(level)
}
func (k *knapsack) SetLogShippingLevelOverride(value string, duration time.Duration) {
k.flags.SetLogShippingLevelOverride(value, duration)
}
func (k *knapsack) LogShippingLevel() string {
return k.flags.LogShippingLevel()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/types/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Flags interface {
// ControlRequestInterval is the interval at which control client will check for updates from the control server.
SetControlRequestInterval(interval time.Duration) error
// SetControlRequestIntervalOverride stores an interval to be temporarily used as an override of any other interval, until the duration has elapased.
SetControlRequestIntervalOverride(interval, duration time.Duration)
SetControlRequestIntervalOverride(value time.Duration, duration time.Duration)
ControlRequestInterval() time.Duration

// DisableControlTLS disables TLS transport with the control server.
Expand Down Expand Up @@ -183,6 +183,7 @@ type Flags interface {

// LogShippingLevel is the level at which logs should be shipped to the server
SetLogShippingLevel(level string) error
SetLogShippingLevelOverride(value string, duration time.Duration)
LogShippingLevel() string

// TraceIngestServerURL is the URL of the ingest server for traces
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/types/mocks/knapsack.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 18 additions & 6 deletions pkg/log/logshipper/logshipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/kolide/kit/ulid"
"github.com/kolide/kit/version"
"github.com/kolide/launcher/pkg/agent/flags/keys"
"github.com/kolide/launcher/pkg/agent/storage"
"github.com/kolide/launcher/pkg/agent/types"
"github.com/kolide/launcher/pkg/sendbuffer"
Expand All @@ -21,7 +22,7 @@ import (
const (
truncatedFormatString = "%s[TRUNCATED]"
defaultSendInterval = 1 * time.Minute
debugSendInterval = 1 * time.Second
debugSendInterval = 5 * time.Second
)

type LogShipper struct {
Expand All @@ -43,10 +44,6 @@ func New(k types.Knapsack, baseLogger log.Logger) *LogShipper {
sender := newAuthHttpSender()

sendInterval := defaultSendInterval
if k.Debug() {
sendInterval = debugSendInterval
}

sendBuffer := sendbuffer.New(sender, sendbuffer.WithSendInterval(sendInterval))

// setting a ulid as session_ulid allows us to follow a single run of launcher
Expand All @@ -63,12 +60,20 @@ func New(k types.Knapsack, baseLogger log.Logger) *LogShipper {
}

ls.slogLevel = new(slog.LevelVar)
ls.slogLevel.Set(slog.LevelError)
ls.slogLevel.Set(slog.LevelInfo)

ls.knapsack.RegisterChangeObserver(ls, keys.LogShippingLevel, keys.LogIngestServerURL)

ls.Ping()
return ls
}

func (ls *LogShipper) FlagsChanged(flagKeys ...keys.FlagKey) {
// TODO: only make updates that are relevant to flag key changes
Copy link
Contributor Author

@James-Pickett James-Pickett Nov 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will address this in follow on PR, it doesn't make anything worse ... maybe even slightly better =)

// calling ping does more work than needed
ls.Ping()
}

// Ping gets the latest token and endpoint from knapsack and updates the sender
func (ls *LogShipper) Ping() {
// set up new auth token
Expand All @@ -91,9 +96,13 @@ func (ls *LogShipper) Ping() {
}

startingLevel := ls.slogLevel.Level()
sendInterval := defaultSendInterval

switch ls.knapsack.LogShippingLevel() {
case "debug":
ls.slogLevel.Set(slog.LevelDebug)
// if we using debug level logging, send logs more frequently
sendInterval = debugSendInterval
case "info":
ls.slogLevel.Set(slog.LevelInfo)
case "warn":
Expand All @@ -111,9 +120,12 @@ func (ls *LogShipper) Ping() {
ls.knapsack.Slogger().Info("log shipping level changed",
"old_log_level", startingLevel.String(),
"new_log_level", ls.slogLevel.Level().String(),
"send_interval", sendInterval.String(),
)
}

ls.sendBuffer.SetSendInterval(sendInterval)

ls.isShippingEnabled = ls.sender.endpoint != ""
ls.addDeviceIdentifyingAttributesToLogger()

Expand Down
Loading
Loading