Skip to content

Commit

Permalink
Add MySql connection support
Browse files Browse the repository at this point in the history
Fix setting default connection from implicit workspace
Set default search path if db arg is connection
remove powerpipe config file watcher
  • Loading branch information
kaidaguerre authored Oct 22, 2024
1 parent a057e4f commit 015ed81
Show file tree
Hide file tree
Showing 20 changed files with 234 additions and 131 deletions.
1 change: 1 addition & 0 deletions .github/workflows/01-powerpipe-pre-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ env:
ASSET_IMAGE_NAME: assets
POWERPIPE_UPDATE_CHECK: false
GH_TOKEN: ${{ secrets.GH_ACCESS_TOKEN }}
SPIPETOOLS_TOKEN: ${{ secrets.SPIPETOOLS_TOKEN }}

jobs:
goreleaser:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/02-powerpipe-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
POWERPIPE_UPDATE_CHECK: false
GH_TOKEN: ${{ secrets.GH_ACCESS_TOKEN }}
VERSION: ${{ github.event.inputs.version }}
SPIPETOOLS_TOKEN: ${{ secrets.SPIPETOOLS_TOKEN }}

jobs:

Expand Down
1 change: 1 addition & 0 deletions .github/workflows/11-test-acceptance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:

env:
POWERPIPE_UPDATE_CHECK: false
SPIPETOOLS_TOKEN: ${{ secrets.SPIPETOOLS_TOKEN }}

jobs:
goreleaser:
Expand Down
8 changes: 1 addition & 7 deletions internal/cmdconfig/app_specific.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@ import (
"strings"

"github.com/Masterminds/semver/v3"
"github.com/hashicorp/hcl/v2"
"github.com/spf13/viper"
"github.com/turbot/go-kit/files"
"github.com/turbot/pipe-fittings/app_specific"
"github.com/turbot/pipe-fittings/app_specific_connection"
"github.com/turbot/pipe-fittings/cmdconfig"
"github.com/turbot/pipe-fittings/connection"
"github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/error_helpers"
"github.com/turbot/pipe-fittings/filepaths"
"github.com/turbot/pipe-fittings/utils"
)

// SetAppSpecificConstants sets app specific constants defined in pipe-fittings
Expand Down Expand Up @@ -80,13 +77,10 @@ func SetAppSpecificConstants() {

func registerConnections() {
app_specific_connection.RegisterConnections(
connection.NewMysqlConnection,
connection.NewSteampipePgConnection,
connection.NewPostgresConnection,
connection.NewSqliteConnection,
connection.NewDuckDbConnection,
)
// set default connections
var defaultSteampipeConnection = connection.NewSteampipePgConnection("default", hcl.Range{})
defaultSteampipeConnection.(*connection.SteampipePgConnection).ConnectionString = utils.ToStringPointer(constants.DefaultSteampipeConnectionString)
app_specific_connection.DefaultConnections["steampipe"] = defaultSteampipeConnection
}
10 changes: 8 additions & 2 deletions internal/cmdconfig/cmd_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"strings"
"time"

"github.com/hashicorp/hcl/v2"
"github.com/mattn/go-isatty"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/app_specific"
"github.com/turbot/pipe-fittings/cmdconfig"
"github.com/turbot/pipe-fittings/connection"
"github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/error_helpers"
"github.com/turbot/pipe-fittings/filepaths"
Expand Down Expand Up @@ -168,11 +170,15 @@ func initGlobalConfig() error_helpers.ErrorAndWarnings {

// if the configured workspace is a cloud workspace, create cloud metadata and set the default connection
if wp != nil && wp.IsCloudWorkspace() {
defaultConnection, ew := wp.GetPipesMetadata()
pipesMetadata, ew := wp.GetPipesMetadata()
if ew.GetError() != nil {
return ew
}
config.DefaultConnection = defaultConnection
defaultConnection := connection.NewSteampipePgConnection(wp.ProfileName, hcl.Range{}).(*connection.SteampipePgConnection)
defaultConnection.ConnectionString = &pipesMetadata.ConnectionString
// TODO temp for now we must call validate to populate the defaults
_ = defaultConnection.Validate()
config.SetDefaultConnection(defaultConnection)
}

// now validate all config values have appropriate values
Expand Down
11 changes: 10 additions & 1 deletion internal/cmdconfig/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,18 @@ func ValidateDatabaseArg() error {
return fmt.Errorf("connection '%s' does not implement connection.ConnectionStringProvider", databaseArg)
}
connectionString := csp.GetConnectionString()
powerpipeconfig.GlobalConfig.DefaultConnection = csp
// update viper Database arg with the connection string
viper.Set(constants.ArgDatabase, connectionString)
// if no search path is set, set it to the connection's default search path
if spp, ok := conn.(connection.SearchPathProvider); ok {
if viper.GetString(constants.ArgSearchPath) == "" {
viper.Set(constants.ArgSearchPath, spp.GetSearchPath())
}

if viper.GetString(constants.ArgSearchPathPrefix) == "" {
viper.Set(constants.ArgSearchPathPrefix, spp.GetSearchPathPrefix())
}
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/constants/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package constants

const (
DatabaseDefaultQueryTimeout = 300
DefaultConnection = "connection.steampipe.default"
DefaultConnection = "steampipe.default"
)
12 changes: 0 additions & 12 deletions internal/dashboardexecute/leaf_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,18 +102,6 @@ func (r *LeafRun) resolveDatabaseConfig() error {
if err != nil {
return err
}
// if the resource specifies a database, use that
if c, ok := r.resource.(modconfig.DatabaseItem); ok {
if resourceDatabase := c.GetDatabase(); resourceDatabase != nil {
database = *resourceDatabase
}
if resourceSearchPath := c.GetSearchPath(); len(resourceSearchPath) > 0 {
searchPathConfig.SearchPath = resourceSearchPath
}
if resourceSearchPathPrefix := c.GetSearchPathPrefix(); len(resourceSearchPathPrefix) > 0 {
searchPathConfig.SearchPathPrefix = resourceSearchPathPrefix
}
}

r.database = database
r.searchPathConfig = searchPathConfig
Expand Down
30 changes: 28 additions & 2 deletions internal/db_client/database_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ func GetDatabaseConfigForResource(resource modconfig.ModTreeItem, workspaceMod *
database := defaultDatabase
searchPathConfig := defaultSearchPathConfig

// if there is no default search path, check if the mod has a search path
// (it's database field may refer to a connection with a search path)
if searchPathConfig.Empty() {
searchPathConfig.SearchPath = workspaceMod.GetSearchPath()
searchPathConfig.SearchPathPrefix = workspaceMod.GetSearchPathPrefix()
}

// NOTE: if the resource is in a dependency mod, check whether database or search path has been specified for it
depName := resource.GetMod().DependencyName

Expand All @@ -33,12 +40,30 @@ func GetDatabaseConfigForResource(resource modconfig.ModTreeItem, workspaceMod *
searchPathConfig.SearchPath = modRequirement.SearchPath
searchPathConfig.SearchPathPrefix = modRequirement.SearchPathPrefix
}
// if the parent mod has a database set, use it
if modDb := resource.GetMod().ModDatabase; modDb != nil {
database = *modDb
}
if modSearchPath := resource.GetMod().SearchPath; len(modSearchPath) > 0 {
searchPathConfig.SearchPath = modSearchPath
}
if modSearchPathPrefix := resource.GetMod().SearchPathPrefix; len(modSearchPathPrefix) > 0 {
searchPathConfig.SearchPathPrefix = modSearchPathPrefix
}

}

// if the resource has a database set, use it
if resource.GetDatabase() != nil {
database = *resource.GetDatabase()
}
// if the resource has a search path set, use it
if resourceSearchPath := resource.GetSearchPath(); len(resourceSearchPath) > 0 {
searchPathConfig.SearchPath = resourceSearchPath
}
if resourceSearchPathPrefix := resource.GetSearchPathPrefix(); len(resourceSearchPathPrefix) > 0 {
searchPathConfig.SearchPathPrefix = resourceSearchPathPrefix
}

// if the database is a cloud workspace, resolve the connection string
if steampipeconfig.IsPipesWorkspaceIdentifier(database) {
Expand Down Expand Up @@ -75,10 +100,11 @@ func GetDefaultDatabaseConfig(opts ...backend.ConnectOption) (string, backend.Se

// if no database is set, use the default connection
if defaultDatabase == "" {
defaultDatabase = powerpipeconfig.GlobalConfig.DefaultConnection.GetConnectionString()
defaultConnection := powerpipeconfig.GlobalConfig.GetDefaultConnection()
defaultDatabase = defaultConnection.GetConnectionString()
// if no search path has been set, use the default connection
if defaultSearchPathConfig.Empty() {
if spp, ok := powerpipeconfig.GlobalConfig.DefaultConnection.(connection.SearchPathProvider); ok {
if spp, ok := defaultConnection.(connection.SearchPathProvider); ok {
defaultSearchPathConfig = backend.SearchPathConfig{
SearchPath: spp.GetSearchPath(),
SearchPathPrefix: spp.GetSearchPathPrefix(),
Expand Down
7 changes: 4 additions & 3 deletions internal/initialisation/init_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package initialisation
import (
"context"
"fmt"
"log/slog"

"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/turbot/powerpipe/internal/powerpipeconfig"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe-plugin-sdk/v5/telemetry"
"log/slog"
)

type InitData[T modconfig.ModTreeItem] struct {
Expand Down Expand Up @@ -73,8 +74,8 @@ func NewInitData[T modconfig.ModTreeItem](ctx context.Context, cmd *cobra.Comman
i.Result.Warnings = errAndWarnings.Warnings

// if the database is NOT set in viper, and the mod has a connection string, set it
if !viper.IsSet(constants.ArgDatabase) && w.Mod.Database != nil {
viper.Set(constants.ArgDatabase, *w.Mod.Database)
if !viper.IsSet(constants.ArgDatabase) && w.Mod.ModDatabase != nil {
viper.Set(constants.ArgDatabase, *w.Mod.ModDatabase)
}

// now do the actual initialisation
Expand Down
88 changes: 7 additions & 81 deletions internal/powerpipeconfig/powerpipe_config.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package powerpipeconfig

import (
"context"
"github.com/turbot/pipe-fittings/app_specific"
"github.com/turbot/powerpipe/internal/constants"
"log/slog"
"strings"
"sync"

"github.com/fsnotify/fsnotify"
filehelpers "github.com/turbot/go-kit/files"
"github.com/turbot/go-kit/filewatcher"
"github.com/turbot/pipe-fittings/app_specific_connection"
"github.com/turbot/pipe-fittings/connection"
)
Expand All @@ -21,16 +15,6 @@ type PowerpipeConfig struct {

PipelingConnections map[string]connection.PipelingConnection

// TODO KAI do we need file watching
watcher *filewatcher.FileWatcher
fileWatcherErrorHandler func(context.Context, error)

// Hooks
OnFileWatcherError func(context.Context, error)
OnFileWatcherEvent func(context.Context, *PowerpipeConfig)

loadLock *sync.Mutex
DefaultConnection connection.ConnectionStringProvider
// cache the connection strings for cloud workspaces (is this ok???
cloudConnectionStrings map[string]string
// lock
Expand All @@ -45,23 +29,21 @@ func NewPowerpipeConfig() *PowerpipeConfig {
}

// populate default connection
defaultConnectionName := strings.TrimPrefix(constants.DefaultConnection, "connection.")

return &PowerpipeConfig{
PipelingConnections: defaultPipelingConnections,
loadLock: &sync.Mutex{},
cloudConnectionStringLock: &sync.RWMutex{},
DefaultConnection: defaultPipelingConnections[defaultConnectionName].(connection.ConnectionStringProvider),
cloudConnectionStrings: make(map[string]string),

cloudConnectionStrings: make(map[string]string),
}
}

func (c *PowerpipeConfig) updateResources(other *PowerpipeConfig) {
c.loadLock.Lock()
defer c.loadLock.Unlock()

c.PipelingConnections = other.PipelingConnections
func (c *PowerpipeConfig) GetDefaultConnection() connection.ConnectionStringProvider {
return c.PipelingConnections[constants.DefaultConnection].(connection.ConnectionStringProvider)
}

func (c *PowerpipeConfig) SetDefaultConnection(defaultConnection connection.PipelingConnection) {
c.PipelingConnections[constants.DefaultConnection] = defaultConnection
}

func (c *PowerpipeConfig) Equals(other *PowerpipeConfig) bool {
Expand All @@ -83,62 +65,6 @@ func (c *PowerpipeConfig) Equals(other *PowerpipeConfig) bool {
return true
}

func (c *PowerpipeConfig) SetupWatcher(ctx context.Context, errorHandler func(context.Context, error)) error {
watcherOptions := &filewatcher.WatcherOptions{
Directories: c.ConfigPaths,
Include: filehelpers.InclusionsFromExtensions([]string{app_specific.ConfigExtension}),
ListFlag: filehelpers.FilesRecursive,
EventMask: fsnotify.Create | fsnotify.Remove | fsnotify.Rename | fsnotify.Write,
// we should look into passing the callback function into the underlying watcher
// we need to analyze the kind of errors that come out from the watcher and
// decide how to handle them
// OnError: errCallback,
OnChange: func(events []fsnotify.Event) {
c.handleFileWatcherEvent(ctx)
},
}
watcher, err := filewatcher.NewWatcher(watcherOptions)
if err != nil {
return err
}
c.watcher = watcher

// start the watcher
watcher.Start()

// set the file watcher error handler, which will get called when there are parsing errors
// after a file watcher event
c.fileWatcherErrorHandler = errorHandler

return nil
}

func (c *PowerpipeConfig) handleFileWatcherEvent(ctx context.Context) {
slog.Debug("PowerpipeConfig handleFileWatcherEvent")

newConfig, errAndWarnings := LoadPowerpipeConfig(c.ConfigPaths...)

if errAndWarnings.GetError() != nil {
// call error hook
if c.OnFileWatcherError != nil {
c.OnFileWatcherError(ctx, errAndWarnings.Error)
}

// Flag on workspace?
return
}

if !newConfig.Equals(c) {
c.updateResources(newConfig)

// call hook
if c.OnFileWatcherEvent != nil {
c.OnFileWatcherEvent(ctx, newConfig)
}
}

}

func (c *PowerpipeConfig) GetCloudConnectionString(workspace string) (string, bool) {
c.cloudConnectionStringLock.RLock()
defer c.cloudConnectionStringLock.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion internal/powerpipeconfig/powerpipe_config_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func LoadPowerpipeConfig(configPaths ...string) (res *PowerpipeConfig, errorsAnd
}

if len(diags) > 0 && lastErrorLength == len(diags) {
return nil, error_helpers.DiagsToErrorsAndWarnings("Failed to load Flowpipe config", diags)
return nil, error_helpers.DiagsToErrorsAndWarnings("Failed to load Powerpipe config", diags)
}

lastErrorLength = len(diags)
Expand Down
9 changes: 9 additions & 0 deletions tests/acceptance/test_data/mods/mysql_mod/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ This is a simple mod used for testing Powerpipe's ability to connect to a MySQL
#### Connect using mysql ####

Start your MySQL server and connect to it.
Create a table(something like this to test the queries):
mysql> SELECT * FROM employees;
+----+---------+------+---------+
| id | name | age | salary |
+----+---------+------+---------+
| 1 | Alice | 25 | 50000.5 |
| 2 | Bob | 30 | 60000.8 |
| 3 | Charlie | 35 | 55000.2 |
+----+---------+------+---------+

#### Connect using powerpipe ####

Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/test_data/mods/mysql_mod/mod.pp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod "mysql_mod"{
title = "MySQL mod"
description = "This is a simple mod used for testing powerpipe's feature of connecting to a MySQL backend."
database = connection.mysql.testdb
}
Loading

0 comments on commit 015ed81

Please sign in to comment.