Skip to content

Commit

Permalink
Service management improvements. Closes #5
Browse files Browse the repository at this point in the history
  • Loading branch information
binaek authored Jan 20, 2021
1 parent 5cdf4c4 commit f88a826
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 48 deletions.
15 changes: 7 additions & 8 deletions cmd/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,20 @@ func runPluginRemoveCmd(cmd *cobra.Command, args []string) {

// returns a map of pluginFullName -> []{connections using pluginFullName}
func getPluginConnectionMap() (map[string][]string, error) {
didWeStartService := false
status, err := db.GetStatus()
if err != nil {
return nil, fmt.Errorf("Could not start steampipe service")
}

if status == nil {
// the db service is not started - start it
db.StartService()
didWeStartService = true
}

if didWeStartService {
// stop the database if we started it!
defer db.StopDB(true)
db.StartService(db.InvokerPlugin)
defer func() {
status, _ := db.GetStatus()
if status.Invoker == db.InvokerPlugin {
db.StopDB(true)
}
}()
}

client, err := db.GetClient(true)
Expand Down
25 changes: 15 additions & 10 deletions cmd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"fmt"
"strings"

"github.com/turbot/steampipe/cmdconfig"

"github.com/spf13/cobra"
"github.com/turbot/steampipe-plugin-sdk/logging"

"github.com/turbot/steampipe/cmdconfig"
"github.com/turbot/steampipe/constants"
"github.com/turbot/steampipe/db"
"github.com/turbot/steampipe/utils"
Expand Down Expand Up @@ -55,11 +55,11 @@ connection from any Postgres compatible database client.`,
cmdconfig.
OnCmd(cmd).
AddBoolFlag("background", "", true, "Run service in the background").
AddBoolFlag("refresh", "", true, "Refresh connections").
// TODO(nw) default to the configuration option?
AddIntFlag("db-port", "", constants.DatabasePort, "Database service port. Chooses a free port by default.").
// TODO(nw) should be validated to an enumerated list
AddStringFlag("listen", "", "network", "Accept connections from: local (localhost only) or network (open)")
AddIntFlag("db-port", "", constants.DatabasePort, "Database service port.").
AddStringFlag("listen", "", string(db.ListenTypeNetwork), "Accept connections from: local (localhost only) or network (open)").
// Hidden flags for internal use
AddStringFlag("invoker", "", string(db.InvokerService), "Invoked by `service` or `query`", cmdconfig.FlagOptions.Hidden()).
AddBoolFlag("refresh", "", true, "Refresh connections on startup", cmdconfig.FlagOptions.Hidden())

return cmd
}
Expand Down Expand Up @@ -124,15 +124,20 @@ func runServiceStartCmd(cmd *cobra.Command, args []string) {
}

listen := db.StartListenType(cmdconfig.Viper().GetString("listen"))

if err := listen.IsValid(); err != nil {
utils.ShowError(err)
return
}

invoker := db.Invoker(cmdconfig.Viper().GetString("invoker"))
if err := invoker.IsValid(); err != nil {
utils.ShowError(err)
return
}

db.EnsureDBInstalled()

status, err := db.StartDB(cmdconfig.Viper().GetInt("db-port"), listen)
status, err := db.StartDB(cmdconfig.Viper().GetInt("db-port"), listen, invoker)
if err != nil {
utils.ShowError(err)
return
Expand Down Expand Up @@ -189,7 +194,7 @@ to force a restart.
return
}

status, err := db.StartDB(currentServiceStatus.Port, currentServiceStatus.ListenType)
status, err := db.StartDB(currentServiceStatus.Port, currentServiceStatus.ListenType, currentServiceStatus.Invoker)
if err != nil {
utils.ShowError(err)
return
Expand Down
8 changes: 8 additions & 0 deletions cmdconfig/cmd_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ type flagOpt func(c *cobra.Command, name string, key string, v *viper.Viper)
// FlagOptions :: shortcut for common flag options
var FlagOptions = struct {
Required func() flagOpt
Hidden func() flagOpt
}{
Required: requiredOpt,
Hidden: hiddenOpt,
}

// Helper function to mark a flag as required
Expand All @@ -29,3 +31,9 @@ func requiredOpt() flagOpt {
c.Flag(name).Usage = fmt.Sprintf("%s %s", u, requiredColor("(required)"))
}
}

func hiddenOpt() flagOpt {
return func(c *cobra.Command, name, key string, v *viper.Viper) {
c.Flag(name).Hidden = true
}
}
7 changes: 7 additions & 0 deletions constants/colors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package constants

import (
"github.com/fatih/color"
)

var Bold = color.New(color.Bold).SprintFunc()
7 changes: 4 additions & 3 deletions db/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func EnsureDBInstalled() {
}

func installSteampipeHub() error {
StartService()
StartService(InvokerInstaller)
rawClient, err := createDbClient()
if err != nil {
return err
Expand All @@ -105,11 +105,12 @@ func installSteampipeHub() error {
return nil
}

func StartService() {
// StartService :: invokes `steampipe service start --listen local --refresh=false --invoker query`
func StartService(invoker Invoker) {
log.Println("[TRACE] start service")
// spawn a process to start the service, passing refresh=false to ensure we DO NOT refresh connections
// (as we will do that ourselves)
cmd := exec.Command(os.Args[0], "service", "start", "--listen", "local", "--refresh=false")
cmd := exec.Command(os.Args[0], "service", "start", "--listen", "local", "--refresh=false", "--invoker", string(invoker))
cmd.Start()
startedAt := time.Now()
spinnerShown := false
Expand Down
4 changes: 2 additions & 2 deletions db/interactive_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ func (c *InteractiveClient) close() {
}

// InteractiveQuery :: start an interactive prompt and return
func (c *InteractiveClient) InteractiveQuery(resultsStreamer *ResultStreamer, stopServiceWhenDone bool) {
func (c *InteractiveClient) InteractiveQuery(resultsStreamer *ResultStreamer, onCompleteCallback func()) {
defer func() {

shutdown(c.client, stopServiceWhenDone)
onCompleteCallback()

r := recover()
switch r.(type) {
Expand Down
32 changes: 21 additions & 11 deletions db/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ import (
"log"

"github.com/turbot/steampipe-plugin-sdk/logging"

"github.com/turbot/steampipe/constants"
"github.com/turbot/steampipe/utils"
)

// ExecuteQuery :: entry point for executing ad-hoc queries from outside the package
func ExecuteQuery(queryString string) (*ResultStreamer, error) {
var didWeStartService bool
var err error

logging.LogTime("db.ExecuteQuery start")
Expand All @@ -23,10 +24,13 @@ func ExecuteQuery(queryString string) (*ResultStreamer, error) {
return nil, errors.New("could not retrieve service status")
}

if status != nil && status.Invoker == InvokerQuery {
return nil, fmt.Errorf("You already have a %s session open. To run multiple sessions, first run %s", constants.Bold("steampipe query"), constants.Bold("steampipe service start"))
}

if status == nil {
// the db service is not started - start it
StartService()
didWeStartService = true
StartService(InvokerQuery)
}

client, err := GetClient(false)
Expand All @@ -35,40 +39,46 @@ func ExecuteQuery(queryString string) (*ResultStreamer, error) {
// refresh connections
if err = refreshConnections(client); err != nil {
// shutdown the service if something went wrong!!!
shutdown(client, didWeStartService)
shutdown(client)
return nil, fmt.Errorf("failed to refresh connections: %v", err)
}

resultsStreamer := newQueryResults()

// this is a callback to close the db et-al. when things get done - no matter the mode
onComplete := func() { shutdown(client) }

if queryString == "" {
interactiveClient, err := newInteractiveClient(client)
utils.FailOnErrorWithMessage(err, "interactive client failed to initialize")

// start the interactive prompt in a go routine
go interactiveClient.InteractiveQuery(resultsStreamer, didWeStartService)
go interactiveClient.InteractiveQuery(resultsStreamer, onComplete)
} else {
result, err := client.executeQuery(queryString)
if err != nil {
return nil, err
}
// send a single result to the streamer - this will close the channel afterwards
// pass an onComplete callback function to shutdown the db
onComplete := func() { shutdown(client, didWeStartService) }
// pass the onComplete callback function to shutdown the db
onComplete := func() { shutdown(client) }
go resultsStreamer.streamSingleResult(result, onComplete)
}

logging.LogTime("db.ExecuteQuery end")
return resultsStreamer, nil
}

func shutdown(client *Client, stopService bool) {
log.Println("[TRACE] shutdown", stopService)
func shutdown(client *Client) {
log.Println("[TRACE] shutdown")
if client != nil {
client.close()
}

// force stop
if stopService {
status, _ := GetStatus()

// force stop if invoked by `query` and we are the last one
if status.Invoker == InvokerQuery {
_, err := StopDB(true)
if err != nil {
utils.ShowError(err)
Expand Down
1 change: 1 addition & 0 deletions db/running_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type RunningDBInstanceInfo struct {
Port int
Listen []string
ListenType StartListenType
Invoker Invoker
Password string
User string
Database string
Expand Down
49 changes: 35 additions & 14 deletions db/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ type StartResult int
// StartListenType :: pseudoEnum of network binding for postgres
type StartListenType string

// Invoker :: pseudoEnum for what starts the service
type Invoker string

const (
// ServiceStarted :: StartResult - Service was started
ServiceStarted StartResult = iota
Expand All @@ -29,29 +32,43 @@ const (
)

const (
// NetworkListenType :: StartListenType - bind to all known interfaces
NetworkListenType StartListenType = "network"
// LocalListenType :: StartListenType - bind to localhost only
LocalListenType = "local"
// ListenTypeNetwork :: StartListenType - bind to all known interfaces
ListenTypeNetwork StartListenType = "network"
// ListenTypeLocal :: StartListenType - bind to localhost only
ListenTypeLocal = "local"
)

const (
// InvokerService :: Invoker - when invoked by `service start`
InvokerService Invoker = "service"
// InvokerQuery :: Invoker - when invoked by `query`
InvokerQuery = "query"
// InvokerInstaller :: Invoker - when invoked by the `installer`
InvokerInstaller = "installer"
// InvokerPlugin :: Invoker - when invoked by the `pluginmanager`
InvokerPlugin = "plugin"
)

// IsValid :: validator for StartListenType known values
func (slt StartListenType) IsValid() error {
switch slt {
case NetworkListenType, LocalListenType:
case ListenTypeNetwork, ListenTypeLocal:
return nil
}
return fmt.Errorf("Invalid listen type. Can be one of '%v' or '%v'", NetworkListenType, LocalListenType)
return fmt.Errorf("Invalid listen type. Can be one of '%v' or '%v'", ListenTypeNetwork, ListenTypeLocal)
}

// StartDBWithDefaults :: start the database, if not already running
// on `127.0.0.1:9193`
func StartDBWithDefaults() (StartResult, error) {
return StartDB(constants.DatabasePort, "local")
// IsValid :: validator for Invoker known values
func (slt Invoker) IsValid() error {
switch slt {
case InvokerService, InvokerQuery, InvokerInstaller, InvokerPlugin:
return nil
}
return fmt.Errorf("Invalid invoker. Can be one of '%v', '%v', '%v' or '%v'", InvokerService, InvokerQuery, InvokerInstaller, InvokerPlugin)
}

// StartDB :: start the database is not already running
func StartDB(port int, listen StartListenType) (StartResult, error) {
func StartDB(port int, listen StartListenType, invoker Invoker) (StartResult, error) {
info, err := loadRunningInstanceInfo()

if err != nil {
Expand All @@ -64,6 +81,9 @@ func StartDB(port int, listen StartListenType) (StartResult, error) {
return ServiceFailedToStart, err
}
if processRunning {
if info.Invoker == InvokerQuery {
return ServiceAlreadyRunning, fmt.Errorf("You have a %s session open. Close this session before running %s", constants.Bold("steampipe query"), constants.Bold("steampipe service start"))
}
return ServiceAlreadyRunning, nil
}
}
Expand All @@ -75,7 +95,7 @@ func StartDB(port int, listen StartListenType) (StartResult, error) {

listenAddresses := "localhost"

if listen == NetworkListenType {
if listen == ListenTypeNetwork {
listenAddresses = "*"
}

Expand Down Expand Up @@ -139,9 +159,10 @@ func StartDB(port int, listen StartListenType) (StartResult, error) {
runningInfo.User = constants.DatabaseSuperUser
runningInfo.Database = "postgres"
runningInfo.ListenType = listen
runningInfo.Invoker = invoker

runningInfo.Listen = []string{"localhost", "127.0.0.1"}
if listen == NetworkListenType {
if listen == ListenTypeNetwork {
addrs, _ := localAddresses()
runningInfo.Listen = append(addrs, runningInfo.Listen...)
}
Expand All @@ -164,7 +185,7 @@ func StartDB(port int, listen StartListenType) (StartResult, error) {
// remove info file (if any)
_ = removeRunningInstanceInfo()
// try restarting
return StartDB(port, listen)
return StartDB(port, listen, invoker)
}

// there was nothing to kill.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/fatih/color v1.7.0
github.com/gertd/go-pluralize v0.1.7
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/google/uuid v1.1.2
github.com/hashicorp/go-hclog v0.14.1
github.com/hashicorp/go-plugin v1.3.0
github.com/hashicorp/go-version v1.2.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXi
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
Expand Down

0 comments on commit f88a826

Please sign in to comment.