Skip to content

Commit

Permalink
rework logstore interfaces, break off from kevalue, bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zackattack01 committed May 17, 2024
1 parent afab399 commit e530c23
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 148 deletions.
9 changes: 8 additions & 1 deletion cmd/launcher/watchdog/controller_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,14 @@ func removeService(serviceManager *mgr.Mgr, serviceName string) error {
func (wc *WatchdogController) restartService(service *mgr.Service) error {
status, err := service.Control(svc.Stop)
if err != nil {
return fmt.Errorf("stopping %s service: %w", service.Name, err)
wc.slogger.Log(context.TODO(), slog.LevelWarn,
"error stopping service",
"err", err,
)

// always attempt to start the service regardless, if the service was already
// stopped it will still err on the control (stop) call above
return service.Start()
}

timeout := time.Now().Add(10 * time.Second)
Expand Down
113 changes: 0 additions & 113 deletions ee/agent/storage/sqlite/keyvalue_store_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,116 +325,3 @@ ON CONFLICT (name) DO UPDATE SET value=excluded.value;`

return deletedKeys, nil
}

func (s *sqliteStore) getColumns() *sqliteColumns {
switch s.tableName {
case StartupSettingsStore.String():
return &sqliteColumns{pk: "name", valueColumn: "value"}
case WatchdogLogStore.String():
return &sqliteColumns{pk: "timestamp", valueColumn: "log"}
}

return nil
}

func (s *sqliteStore) AppendValue(timestamp int64, value []byte) error {
colInfo := s.getColumns()
if s == nil || s.conn == nil || colInfo == nil {
return errors.New("store is nil")
}

if s.readOnly {
return errors.New("cannot perform update with RO connection")
}

insertSql := fmt.Sprintf(
`INSERT INTO %s (%s, %s) VALUES (?, ?)`,
s.tableName,
colInfo.pk,
colInfo.valueColumn,
)

if _, err := s.conn.Exec(insertSql, timestamp, value); err != nil {
return fmt.Errorf("appending row into %s: %w", s.tableName, err)
}

return nil
}

func (s *sqliteStore) DeleteRows(rowids ...any) error {
if s == nil || s.conn == nil {
return errors.New("store is nil")
}

if s.readOnly {
return errors.New("cannot perform deletes with RO connection")
}

if len(rowids) == 0 {
return nil
}

// interpolate the proper number of question marks
paramQs := strings.Repeat("?,", len(rowids))
paramQs = paramQs[:len(paramQs)-1]
deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, s.tableName, paramQs)

if _, err := s.conn.Exec(deleteSql, rowids...); err != nil {
return fmt.Errorf("deleting row from %s: %w", s.tableName, err)
}

return nil
}

func (s *sqliteStore) ForEach(fn func(rowid, timestamp int64, v []byte) error) error {
colInfo := s.getColumns()
if s == nil || s.conn == nil || colInfo == nil {
return errors.New("store is nil")
}

query := fmt.Sprintf(
`SELECT rowid, %s, %s FROM %s;`,
colInfo.pk,
colInfo.valueColumn,
s.tableName,
)

rows, err := s.conn.Query(query)
if err != nil {
return fmt.Errorf("issuing foreach query: %w", err)
}

defer rows.Close()

for rows.Next() {
var rowid int64
var timestamp int64
var result string
if err := rows.Scan(&rowid, &timestamp, &result); err != nil {
return fmt.Errorf("scanning foreach query: %w", err)
}

if err := fn(rowid, timestamp, []byte(result)); err != nil {
return fmt.Errorf("caller error during foreach iteration: %w", err)
}
}

return nil
}

func (s *sqliteStore) Count() (int, error) {
if s == nil || s.conn == nil {
return 0, errors.New("store is nil")
}

// It's fine to interpolate the table name into the query because
// we require the table name to be in our allowlist `supportedTables`
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s;`, s.tableName)

var countValue int
if err := s.conn.QueryRow(query).Scan(&countValue); err != nil {
return 0, fmt.Errorf("querying for %s table count: %w", s.tableName, err)
}

return countValue, nil
}
120 changes: 120 additions & 0 deletions ee/agent/storage/sqlite/logstore_sqlite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package agentsqlite

import (
"errors"
"fmt"
"strings"
)

func (s *sqliteStore) getColumns() *sqliteColumns {
switch s.tableName {
case StartupSettingsStore.String():
return &sqliteColumns{pk: "name", valueColumn: "value"}
case WatchdogLogStore.String():
return &sqliteColumns{pk: "timestamp", valueColumn: "log"}
}

return nil
}

func (s *sqliteStore) AppendValue(timestamp int64, value []byte) error {
colInfo := s.getColumns()
if s == nil || s.conn == nil || colInfo == nil {
return errors.New("store is nil")
}

if s.readOnly {
return errors.New("cannot perform update with RO connection")
}

insertSql := fmt.Sprintf(
`INSERT INTO %s (%s, %s) VALUES (?, ?)`,
s.tableName,
colInfo.pk,
colInfo.valueColumn,
)

if _, err := s.conn.Exec(insertSql, timestamp, value); err != nil {
return fmt.Errorf("appending row into %s: %w", s.tableName, err)
}

return nil
}

func (s *sqliteStore) DeleteRows(rowids ...any) error {
if s == nil || s.conn == nil {
return errors.New("store is nil")
}

if s.readOnly {
return errors.New("cannot perform deletes with RO connection")
}

if len(rowids) == 0 {
return nil
}

// interpolate the proper number of question marks
paramQs := strings.Repeat("?,", len(rowids))
paramQs = paramQs[:len(paramQs)-1]
deleteSql := fmt.Sprintf(`DELETE FROM %s WHERE rowid IN (%s)`, s.tableName, paramQs)

if _, err := s.conn.Exec(deleteSql, rowids...); err != nil {
return fmt.Errorf("deleting row from %s: %w", s.tableName, err)
}

return nil
}

func (s *sqliteStore) ForEach(fn func(rowid, timestamp int64, v []byte) error) error {
colInfo := s.getColumns()
if s == nil || s.conn == nil || colInfo == nil {
return errors.New("store is nil")
}

query := fmt.Sprintf(
`SELECT rowid, %s, %s FROM %s;`,
colInfo.pk,
colInfo.valueColumn,
s.tableName,
)

rows, err := s.conn.Query(query)
if err != nil {
return fmt.Errorf("issuing foreach query: %w", err)
}

defer rows.Close()

for rows.Next() {
var rowid int64
var timestamp int64
var result string
if err := rows.Scan(&rowid, &timestamp, &result); err != nil {
return fmt.Errorf("scanning foreach query: %w", err)
}

if err := fn(rowid, timestamp, []byte(result)); err != nil {
return fmt.Errorf("caller error during foreach iteration: %w", err)
}
}

return nil
}

func (s *sqliteStore) Count() (int, error) {
if s == nil || s.conn == nil {
return 0, errors.New("store is nil")
}

// It's fine to interpolate the table name into the query because
// we require the table name to be in our allowlist `supportedTables`
query := fmt.Sprintf(`SELECT COUNT(*) FROM %s;`, s.tableName)

var countValue int
if err := s.conn.QueryRow(query).Scan(&countValue); err != nil {
return 0, fmt.Errorf("querying for %s table count: %w", s.tableName, err)
}

return countValue, nil
}
32 changes: 32 additions & 0 deletions ee/agent/storage/sqlite/logstore_sqlite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package agentsqlite

import (
"context"
"testing"
"time"

"github.com/kolide/launcher/ee/agent/flags/keys"
"github.com/stretchr/testify/require"
)

func TestAppendValue(t *testing.T) {
t.Parallel()

testRootDir := t.TempDir()

s, err := OpenRW(context.TODO(), testRootDir, WatchdogLogStore)
require.NoError(t, err, "creating test store")

flagKey := []byte(keys.UpdateChannel.String())
flagVal := []byte("beta")

startTime := time.Now().Unix()
logEntry := `{"time":"2024-05-13T18:29:31.7829101Z", "msg":"testmsg"}`
require.NoError(t, s.AppendValue(startTime, []byte(logEntry)), "expected no error appending value row")

returnedVal, err := s.Get(flagKey)
require.NoError(t, err, "expected no error getting value")
require.Equal(t, flagVal, returnedVal, "flag value mismatch")

require.NoError(t, s.Close())
}
34 changes: 0 additions & 34 deletions ee/agent/types/keyvalue_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ type Deleter interface {
DeleteAll() error
}

// RowDeleter is an interface for deleting rows by rowid in a sql store
type RowDeleter interface {
DeleteRows(rowids ...any) error
}

// Iterator is an interface for iterating data in a key/value store.
type Iterator interface {
// ForEach executes a function for each key/value pair in a store.
Expand All @@ -38,21 +33,6 @@ type Iterator interface {
ForEach(fn func(k, v []byte) error) error
}

// TimestampedIterator is a read-only interface for iterating timestamped data.
type TimestampedIterator interface {
// ForEach executes a function for each timestamp/value pair in a store.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller. The provided function must not modify
// the store; this will result in undefined behavior.
ForEach(fn func(rowid, timestamp int64, v []byte) error) error
}

// TimestampedAppender is an interface for supporting the addition of timestamped values to a store
type TimestampedAppender interface {
// AppendValue takes the timestamp, and marshalled value for insertion as a new row
AppendValue(timestamp int64, value []byte) error
}

// Updater is an interface for bulk replacing data in a key/value store.
type Updater interface {
// Update takes a map of key-value pairs, and inserts
Expand Down Expand Up @@ -123,19 +103,5 @@ type GetterSetterDeleterIteratorUpdaterCounterAppender interface {
Appender
}

// TimestampedIteratorDeleterAppenderCounterCloser is an interface to support the storage and retrieval of
// sets of timestamped values. This can be used where a strict key/value interface may not suffice,
// e.g. for writing logs or historical records to sqlite
type TimestampedIteratorDeleterAppenderCounterCloser interface {
TimestampedIterator
TimestampedAppender
Counter
RowDeleter
Closer
}

// LogStore is a convenient alias for a store that supports all methods required to manipulate sqlite logs
type LogStore = TimestampedIteratorDeleterAppenderCounterCloser

// Convenient alias for a key value store that supports all methods
type KVStore = GetterSetterDeleterIteratorUpdaterCounterAppender
35 changes: 35 additions & 0 deletions ee/agent/types/logstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package types

// RowDeleter is an interface for deleting rows by rowid in a sql store
type RowDeleter interface {
DeleteRows(rowids ...any) error
}

// TimestampedIterator is a read-only interface for iterating timestamped data.
type TimestampedIterator interface {
// ForEach executes a function for each timestamp/value pair in a store.
// If the provided function returns an error then the iteration is stopped and
// the error is returned to the caller. The provided function must not modify
// the store; this will result in undefined behavior.
ForEach(fn func(rowid, timestamp int64, v []byte) error) error
}

// TimestampedAppender is an interface for supporting the addition of timestamped values to a store
type TimestampedAppender interface {
// AppendValue takes the timestamp, and marshalled value for insertion as a new row
AppendValue(timestamp int64, value []byte) error
}

// TimestampedIteratorDeleterAppenderCounterCloser is an interface to support the storage and retrieval of
// sets of timestamped values. This can be used where a strict key/value interface may not suffice,
// e.g. for writing logs or historical records to sqlite
type TimestampedIteratorDeleterAppenderCounterCloser interface {
TimestampedIterator
TimestampedAppender
Counter
RowDeleter
Closer
}

// LogStore is a convenient alias for a store that supports all methods required to manipulate sqlite logs
type LogStore = TimestampedIteratorDeleterAppenderCounterCloser

0 comments on commit e530c23

Please sign in to comment.