From e530c234b52606f0f5cdbebc3e6f7f4af69938c3 Mon Sep 17 00:00:00 2001 From: zackattack01 Date: Fri, 17 May 2024 15:29:16 -0400 Subject: [PATCH] rework logstore interfaces, break off from kevalue, bugfixes --- cmd/launcher/watchdog/controller_windows.go | 9 +- .../storage/sqlite/keyvalue_store_sqlite.go | 113 ----------------- ee/agent/storage/sqlite/logstore_sqlite.go | 120 ++++++++++++++++++ .../storage/sqlite/logstore_sqlite_test.go | 32 +++++ ee/agent/types/keyvalue_store.go | 34 ----- ee/agent/types/logstore.go | 35 +++++ 6 files changed, 195 insertions(+), 148 deletions(-) create mode 100644 ee/agent/storage/sqlite/logstore_sqlite.go create mode 100644 ee/agent/storage/sqlite/logstore_sqlite_test.go create mode 100644 ee/agent/types/logstore.go diff --git a/cmd/launcher/watchdog/controller_windows.go b/cmd/launcher/watchdog/controller_windows.go index 82365cb2fe..cf1bf76a2e 100644 --- a/cmd/launcher/watchdog/controller_windows.go +++ b/cmd/launcher/watchdog/controller_windows.go @@ -291,7 +291,14 @@ func removeService(serviceManager *mgr.Mgr, serviceName string) error { func (wc *WatchdogController) restartService(service *mgr.Service) error { status, err := service.Control(svc.Stop) if err != nil { - return fmt.Errorf("stopping %s service: %w", service.Name, err) + wc.slogger.Log(context.TODO(), slog.LevelWarn, + "error stopping service", + "err", err, + ) + + // always attempt to start the service regardless, if the service was already + // stopped it will still err on the control (stop) call above + return service.Start() } timeout := time.Now().Add(10 * time.Second) diff --git a/ee/agent/storage/sqlite/keyvalue_store_sqlite.go b/ee/agent/storage/sqlite/keyvalue_store_sqlite.go index 314f1ef760..c31d0deee7 100644 --- a/ee/agent/storage/sqlite/keyvalue_store_sqlite.go +++ b/ee/agent/storage/sqlite/keyvalue_store_sqlite.go @@ -325,116 +325,3 @@ ON CONFLICT (name) DO UPDATE SET value=excluded.value;` return deletedKeys, nil } - -func (s *sqliteStore) getColumns() *sqliteColumns { - switch s.tableName { - case StartupSettingsStore.String(): - return &sqliteColumns{pk: "name", valueColumn: "value"} - case WatchdogLogStore.String(): - return &sqliteColumns{pk: "timestamp", valueColumn: "log"} - } - - return nil -} - -func (s *sqliteStore) AppendValue(timestamp int64, value []byte) error { - colInfo := s.getColumns() - if s == nil || s.conn == nil || colInfo == nil { - return errors.New("store is nil") - } - - if s.readOnly { - return errors.New("cannot perform update with RO connection") - } - - insertSql := fmt.Sprintf( - `INSERT INTO %s (%s, %s) VALUES (?, ?)`, - s.tableName, - colInfo.pk, - colInfo.valueColumn, - ) - - if _, err := s.conn.Exec(insertSql, timestamp, value); err != nil { - return fmt.Errorf("appending row into %s: %w", s.tableName, err) - } - - return nil -} - -func (s *sqliteStore) DeleteRows(rowids ...any) error { - if s == nil || s.conn == nil { - return errors.New("store is nil") - } - - if s.readOnly { - return errors.New("cannot perform deletes with RO connection") - } - - if len(rowids) == 0 { - return nil - } - - // interpolate the proper number of question marks - paramQs := strings.Repeat("?,", len(rowids)) - paramQs = paramQs[:len(paramQs)-1] - deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, s.tableName, paramQs) - - if _, err := s.conn.Exec(deleteSql, rowids...); err != nil { - return fmt.Errorf("deleting row from %s: %w", s.tableName, err) - } - - return nil -} - -func (s *sqliteStore) ForEach(fn func(rowid, timestamp int64, v []byte) error) error { - colInfo := s.getColumns() - if s == nil || s.conn == nil || colInfo == nil { - return errors.New("store is nil") - } - - query := fmt.Sprintf( - `SELECT rowid, %s, %s FROM %s;`, - colInfo.pk, - colInfo.valueColumn, - s.tableName, - ) - - rows, err := s.conn.Query(query) - if err != nil { - return fmt.Errorf("issuing foreach query: %w", err) - } - - defer rows.Close() - - for rows.Next() { - var rowid int64 - var timestamp int64 - var result string - if err := rows.Scan(&rowid, ×tamp, &result); err != nil { - return fmt.Errorf("scanning foreach query: %w", err) - } - - if err := fn(rowid, timestamp, []byte(result)); err != nil { - return fmt.Errorf("caller error during foreach iteration: %w", err) - } - } - - return nil -} - -func (s *sqliteStore) Count() (int, error) { - if s == nil || s.conn == nil { - return 0, errors.New("store is nil") - } - - // It's fine to interpolate the table name into the query because - // we require the table name to be in our allowlist `supportedTables` - query := fmt.Sprintf(`SELECT COUNT(*) FROM %s;`, s.tableName) - - var countValue int - if err := s.conn.QueryRow(query).Scan(&countValue); err != nil { - return 0, fmt.Errorf("querying for %s table count: %w", s.tableName, err) - } - - return countValue, nil -} diff --git a/ee/agent/storage/sqlite/logstore_sqlite.go b/ee/agent/storage/sqlite/logstore_sqlite.go new file mode 100644 index 0000000000..0e04e32fb7 --- /dev/null +++ b/ee/agent/storage/sqlite/logstore_sqlite.go @@ -0,0 +1,120 @@ +package agentsqlite + +import ( + "errors" + "fmt" + "strings" +) + +func (s *sqliteStore) getColumns() *sqliteColumns { + switch s.tableName { + case StartupSettingsStore.String(): + return &sqliteColumns{pk: "name", valueColumn: "value"} + case WatchdogLogStore.String(): + return &sqliteColumns{pk: "timestamp", valueColumn: "log"} + } + + return nil +} + +func (s *sqliteStore) AppendValue(timestamp int64, value []byte) error { + colInfo := s.getColumns() + if s == nil || s.conn == nil || colInfo == nil { + return errors.New("store is nil") + } + + if s.readOnly { + return errors.New("cannot perform update with RO connection") + } + + insertSql := fmt.Sprintf( + `INSERT INTO %s (%s, %s) VALUES (?, ?)`, + s.tableName, + colInfo.pk, + colInfo.valueColumn, + ) + + if _, err := s.conn.Exec(insertSql, timestamp, value); err != nil { + return fmt.Errorf("appending row into %s: %w", s.tableName, err) + } + + return nil +} + +func (s *sqliteStore) DeleteRows(rowids ...any) error { + if s == nil || s.conn == nil { + return errors.New("store is nil") + } + + if s.readOnly { + return errors.New("cannot perform deletes with RO connection") + } + + if len(rowids) == 0 { + return nil + } + + // interpolate the proper number of question marks + paramQs := strings.Repeat("?,", len(rowids)) + paramQs = paramQs[:len(paramQs)-1] + deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, s.tableName, paramQs) + + if _, err := s.conn.Exec(deleteSql, rowids...); err != nil { + return fmt.Errorf("deleting row from %s: %w", s.tableName, err) + } + + return nil +} + +func (s *sqliteStore) ForEach(fn func(rowid, timestamp int64, v []byte) error) error { + colInfo := s.getColumns() + if s == nil || s.conn == nil || colInfo == nil { + return errors.New("store is nil") + } + + query := fmt.Sprintf( + `SELECT rowid, %s, %s FROM %s;`, + colInfo.pk, + colInfo.valueColumn, + s.tableName, + ) + + rows, err := s.conn.Query(query) + if err != nil { + return fmt.Errorf("issuing foreach query: %w", err) + } + + defer rows.Close() + + for rows.Next() { + var rowid int64 + var timestamp int64 + var result string + if err := rows.Scan(&rowid, ×tamp, &result); err != nil { + return fmt.Errorf("scanning foreach query: %w", err) + } + + if err := fn(rowid, timestamp, []byte(result)); err != nil { + return fmt.Errorf("caller error during foreach iteration: %w", err) + } + } + + return nil +} + +func (s *sqliteStore) Count() (int, error) { + if s == nil || s.conn == nil { + return 0, errors.New("store is nil") + } + + // It's fine to interpolate the table name into the query because + // we require the table name to be in our allowlist `supportedTables` + query := fmt.Sprintf(`SELECT COUNT(*) FROM %s;`, s.tableName) + + var countValue int + if err := s.conn.QueryRow(query).Scan(&countValue); err != nil { + return 0, fmt.Errorf("querying for %s table count: %w", s.tableName, err) + } + + return countValue, nil +} diff --git a/ee/agent/storage/sqlite/logstore_sqlite_test.go b/ee/agent/storage/sqlite/logstore_sqlite_test.go new file mode 100644 index 0000000000..6d32152ed7 --- /dev/null +++ b/ee/agent/storage/sqlite/logstore_sqlite_test.go @@ -0,0 +1,32 @@ +package agentsqlite + +import ( + "context" + "testing" + "time" + + "github.com/kolide/launcher/ee/agent/flags/keys" + "github.com/stretchr/testify/require" +) + +func TestAppendValue(t *testing.T) { + t.Parallel() + + testRootDir := t.TempDir() + + s, err := OpenRW(context.TODO(), testRootDir, WatchdogLogStore) + require.NoError(t, err, "creating test store") + + flagKey := []byte(keys.UpdateChannel.String()) + flagVal := []byte("beta") + + startTime := time.Now().Unix() + logEntry := `{"time":"2024-05-13T18:29:31.7829101Z", "msg":"testmsg"}` + require.NoError(t, s.AppendValue(startTime, []byte(logEntry)), "expected no error appending value row") + + returnedVal, err := s.Get(flagKey) + require.NoError(t, err, "expected no error getting value") + require.Equal(t, flagVal, returnedVal, "flag value mismatch") + + require.NoError(t, s.Close()) +} diff --git a/ee/agent/types/keyvalue_store.go b/ee/agent/types/keyvalue_store.go index 7ea7b483b2..7cdda32c1c 100644 --- a/ee/agent/types/keyvalue_store.go +++ b/ee/agent/types/keyvalue_store.go @@ -24,11 +24,6 @@ type Deleter interface { DeleteAll() error } -// RowDeleter is an interface for deleting rows by rowid in a sql store -type RowDeleter interface { - DeleteRows(rowids ...any) error -} - // Iterator is an interface for iterating data in a key/value store. type Iterator interface { // ForEach executes a function for each key/value pair in a store. @@ -38,21 +33,6 @@ type Iterator interface { ForEach(fn func(k, v []byte) error) error } -// TimestampedIterator is a read-only interface for iterating timestamped data. -type TimestampedIterator interface { - // ForEach executes a function for each timestamp/value pair in a store. - // If the provided function returns an error then the iteration is stopped and - // the error is returned to the caller. The provided function must not modify - // the store; this will result in undefined behavior. - ForEach(fn func(rowid, timestamp int64, v []byte) error) error -} - -// TimestampedAppender is an interface for supporting the addition of timestamped values to a store -type TimestampedAppender interface { - // AppendValue takes the timestamp, and marshalled value for insertion as a new row - AppendValue(timestamp int64, value []byte) error -} - // Updater is an interface for bulk replacing data in a key/value store. type Updater interface { // Update takes a map of key-value pairs, and inserts @@ -123,19 +103,5 @@ type GetterSetterDeleterIteratorUpdaterCounterAppender interface { Appender } -// TimestampedIteratorDeleterAppenderCounterCloser is an interface to support the storage and retrieval of -// sets of timestamped values. This can be used where a strict key/value interface may not suffice, -// e.g. for writing logs or historical records to sqlite -type TimestampedIteratorDeleterAppenderCounterCloser interface { - TimestampedIterator - TimestampedAppender - Counter - RowDeleter - Closer -} - -// LogStore is a convenient alias for a store that supports all methods required to manipulate sqlite logs -type LogStore = TimestampedIteratorDeleterAppenderCounterCloser - // Convenient alias for a key value store that supports all methods type KVStore = GetterSetterDeleterIteratorUpdaterCounterAppender diff --git a/ee/agent/types/logstore.go b/ee/agent/types/logstore.go new file mode 100644 index 0000000000..ae0949e14c --- /dev/null +++ b/ee/agent/types/logstore.go @@ -0,0 +1,35 @@ +package types + +// RowDeleter is an interface for deleting rows by rowid in a sql store +type RowDeleter interface { + DeleteRows(rowids ...any) error +} + +// TimestampedIterator is a read-only interface for iterating timestamped data. +type TimestampedIterator interface { + // ForEach executes a function for each timestamp/value pair in a store. + // If the provided function returns an error then the iteration is stopped and + // the error is returned to the caller. The provided function must not modify + // the store; this will result in undefined behavior. + ForEach(fn func(rowid, timestamp int64, v []byte) error) error +} + +// TimestampedAppender is an interface for supporting the addition of timestamped values to a store +type TimestampedAppender interface { + // AppendValue takes the timestamp, and marshalled value for insertion as a new row + AppendValue(timestamp int64, value []byte) error +} + +// TimestampedIteratorDeleterAppenderCounterCloser is an interface to support the storage and retrieval of +// sets of timestamped values. This can be used where a strict key/value interface may not suffice, +// e.g. for writing logs or historical records to sqlite +type TimestampedIteratorDeleterAppenderCounterCloser interface { + TimestampedIterator + TimestampedAppender + Counter + RowDeleter + Closer +} + +// LogStore is a convenient alias for a store that supports all methods required to manipulate sqlite logs +type LogStore = TimestampedIteratorDeleterAppenderCounterCloser