Skip to content

Commit

Permalink
feature complete
Browse files Browse the repository at this point in the history
  • Loading branch information
raulb committed Nov 25, 2024
1 parent c00924b commit 00f153b
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 287 deletions.
71 changes: 71 additions & 0 deletions cmd/conduit/internal/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package internal

import (
"fmt"
"os"
"strings"

"github.com/conduitio/conduit/pkg/conduit"
"github.com/spf13/viper"
)

const (
CONDUIT_PREFIX = "CONDUIT"

Check failure on line 13 in cmd/conduit/internal/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

ST1003: should not use ALL_CAPS in Go names; use CamelCase instead (stylecheck)
)

// LoadConfigFromFile loads on cfg, the configuration from the file at path.
func LoadConfigFromFile(filePath string, cfg *conduit.Config) error {
v := viper.New()

// Set the file name and path
v.SetConfigFile(filePath)

// Attempt to read the configuration file
if err := v.ReadInConfig(); err != nil {
// here we could simply log conduit.yaml file doesn't exist since this is optional
//return fmt.Errorf("error reading config file: %w", err)

Check failure on line 26 in cmd/conduit/internal/config.go

View workflow job for this annotation

GitHub Actions / golangci-lint

commentFormatting: put a space between `//` and comment text (gocritic)
return nil
}

// Unmarshal the config into the cfg struct
if err := v.Unmarshal(&cfg); err != nil {
return fmt.Errorf("unable to decode into struct: %w", err)
}

return nil
}

// TODO: check if logger is correct
func LoadConfigFromEnv(cfg *conduit.Config) error {
v := viper.New()

// Set environment variable prefix
v.SetEnvPrefix(CONDUIT_PREFIX)

// Automatically map environment variables
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))

for _, env := range os.Environ() {
pair := strings.SplitN(env, "=", 2)
key := pair[0]
value := pair[1]

// Check if the environment variable has the desired prefix
if strings.HasPrefix(key, fmt.Sprintf("%s_", CONDUIT_PREFIX)) {
// Strip the prefix and replace underscores with dots
strippedKey := strings.ToLower(strings.TrimPrefix(key, fmt.Sprintf("%s_", CONDUIT_PREFIX)))
strippedKey = strings.ReplaceAll(strippedKey, "_", ".")

// Set the value in Viper
v.Set(strippedKey, value)
}
}

// Unmarshal the environment variables into the config struct
err := v.Unmarshal(cfg)
if err != nil {
return fmt.Errorf("error unmarshalling config from environment variables: %w", err)
}
return nil
}
3 changes: 2 additions & 1 deletion cmd/conduit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (

func main() {
e := ecdysis.New()
cmd := e.MustBuildCobraCommand(&root.RootCommand{})

cmd := e.MustBuildCobraCommand(&root.RootCommand{})
cmd.CompletionOptions.DisableDefaultCmd = true

if err := cmd.Execute(); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
os.Exit(0)
}
59 changes: 21 additions & 38 deletions cmd/conduit/root/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,24 @@ package root

import (
"context"
"flag"
"fmt"
"os"
"path/filepath"
"reflect"

"github.com/conduitio/conduit/cmd/conduit/internal"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
"github.com/conduitio/yaml/v3"
)

var (
_ ecdysis.CommandWithFlags = (*InitCommand)(nil)
_ ecdysis.CommandWithExecute = (*InitCommand)(nil)
_ ecdysis.CommandWithDocs = (*InitCommand)(nil)
)

type InitFlags struct {
ConfigPath string `flag:"config.path" usage:"path where Conduit will be initialized"`
}

type InitCommand struct {
flags InitFlags
rootFlags *RootFlags
}

func (c *InitCommand) Usage() string { return "init" }
Expand All @@ -51,10 +45,12 @@ func (c *InitCommand) Docs() ecdysis.Docs {
}

func (c *InitCommand) createDirs() error {
// These could be used based on the root flags if those were global
dirs := []string{"processors", "connectors", "pipelines"}
conduitPath := filepath.Dir(c.rootFlags.ConduitConfigPath)

for _, dir := range dirs {
path := filepath.Join(c.flags.ConfigPath, dir)
path := filepath.Join(conduitPath, dir)

// Attempt to create the directory, skipping if it already exists
if err := os.Mkdir(path, os.ModePerm); err != nil {
Expand All @@ -71,28 +67,29 @@ func (c *InitCommand) createDirs() error {
return nil
}

func (c *InitCommand) conduitCfgFlags() *flag.FlagSet {
cfg := conduit.DefaultConfigWithBasePath(c.flags.ConfigPath)
return conduit.Flags(&cfg)
}

func (c *InitCommand) createConfigYAML() error {
cfgYAML := internal.NewYAMLTree()
c.conduitCfgFlags().VisitAll(func(f *flag.Flag) {
cfgYAML.Insert(f.Name, f.DefValue, f.Usage)
})

v := reflect.Indirect(reflect.ValueOf(c.rootFlags))
t := v.Type()

for i := 0; i < v.NumField(); i++ {
field := t.Field(i)
value := fmt.Sprintf("%v", v.Field(i).Interface())
usage := field.Tag.Get("usage")
longName := field.Tag.Get("long")
cfgYAML.Insert(longName, value, usage)
}
yamlData, err := yaml.Marshal(cfgYAML.Root)
if err != nil {
return cerrors.Errorf("error marshaling YAML: %w\n", err)
}

path := filepath.Join(c.flags.ConfigPath, "conduit.yaml")
err = os.WriteFile(path, yamlData, 0o600)
err = os.WriteFile(c.rootFlags.ConduitConfigPath, yamlData, 0o600)
if err != nil {
return cerrors.Errorf("error writing conduit.yaml: %w", err)
}
fmt.Printf("Configuration file written to %v\n", path)
fmt.Printf("Configuration file written to %v\n", c.rootFlags.ConduitConfigPath)

return nil
}
Expand All @@ -109,24 +106,10 @@ func (c *InitCommand) Execute(ctx context.Context) error {
}

fmt.Println(`
Conduit has been initialized!
To quickly create an example pipeline, run 'conduit pipelines init'.
To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`)
Conduit has been initialized!
To quickly create an example pipeline, run 'conduit pipelines init'.
To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`)

return nil
}

func (c *InitCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

// Set current working directory as default
currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}

flags.SetDefault("config.path", currentPath)

return flags
}
134 changes: 126 additions & 8 deletions cmd/conduit/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ package root
import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/conduitio/conduit/cmd/conduit/internal"
"github.com/conduitio/conduit/cmd/conduit/root/pipelines"
"github.com/conduitio/conduit/pkg/conduit"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/ecdysis"
)

Expand All @@ -32,6 +36,16 @@ var (
)

type RootFlags struct {
// Global flags -----------------------------------------------------------

// Conduit configuration file
ConduitConfigPath string `long:"config" usage:"global conduit configuration file" persistent:"true" default:"./conduit.yaml"`

// Version
Version bool `long:"version" short:"v" usage:"show current Conduit version" persistent:"true"`

// Root flags -------------------------------------------------------------

// Database configuration
DBType string `long:"db.type" usage:"database type; accepts badger,postgres,inmemory,sqlite"`
DBBadgerPath string `long:"db.badger.path" usage:"path to badger DB"`
Expand Down Expand Up @@ -73,35 +87,139 @@ type RootFlags struct {
DevCPUProfile string `long:"dev.cpuprofile" usage:"write CPU profile to file"`
DevMemProfile string `long:"dev.memprofile" usage:"write memory profile to file"`
DevBlockProfile string `long:"dev.blockprofile" usage:"write block profile to file"`

// Version
Version bool `long:"version" short:"v" usage:"show version" persistent:"true"`
}

type RootCommand struct {
flags RootFlags
cfg conduit.Config
}

func (c *RootCommand) updateConfigFromFlags() {
c.cfg.DB.Type = c.flags.DBType
c.cfg.DB.Postgres.ConnectionString = c.flags.DBPostgresConnectionString
c.cfg.DB.Postgres.Table = c.flags.DBPostgresTable
c.cfg.DB.SQLite.Table = c.flags.DBSQLiteTable

// Map API configuration
c.cfg.API.Enabled = c.flags.APIEnabled
c.cfg.API.HTTP.Address = c.flags.APIHTTPAddress
c.cfg.API.GRPC.Address = c.flags.APIGRPCAddress

// Map logging configuration
c.cfg.Log.Level = c.flags.LogLevel
c.cfg.Log.Format = c.flags.LogFormat

// Map pipeline configuration
c.cfg.Pipelines.ExitOnDegraded = c.flags.PipelinesExitOnDegraded
c.cfg.Pipelines.ErrorRecovery.MinDelay = c.flags.PipelinesErrorRecoveryMinDelay
c.cfg.Pipelines.ErrorRecovery.MaxDelay = c.flags.PipelinesErrorRecoveryMaxDelay
c.cfg.Pipelines.ErrorRecovery.BackoffFactor = c.flags.PipelinesErrorRecoveryBackoffFactor
c.cfg.Pipelines.ErrorRecovery.MaxRetries = c.flags.PipelinesErrorRecoveryMaxRetries
c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow = c.flags.PipelinesErrorRecoveryMaxRetriesWindow

// Map schema registry configuration
c.cfg.SchemaRegistry.Type = c.flags.SchemaRegistryType
c.cfg.SchemaRegistry.Confluent.ConnectionString = c.flags.SchemaRegistryConfluentConnectionString

// Map preview features
c.cfg.Preview.PipelineArchV2 = c.flags.PreviewPipelineArchV2

// Map development profiling
c.cfg.Dev.CPUProfile = c.flags.DevCPUProfile
c.cfg.Dev.MemProfile = c.flags.DevMemProfile
c.cfg.Dev.BlockProfile = c.flags.DevBlockProfile

// Update paths
c.cfg.DB.SQLite.Path = c.flags.DBSQLitePath
c.cfg.DB.Badger.Path = c.flags.DBBadgerPath
c.cfg.Pipelines.Path = c.flags.PipelinesPath
c.cfg.Connectors.Path = c.flags.ConnectorsPath
c.cfg.Processors.Path = c.flags.ProcessorsPath
}

func (c *RootCommand) updateFlagValuesFromConfig() {
// Map database configuration
c.flags.DBType = c.cfg.DB.Type
c.flags.DBPostgresConnectionString = c.cfg.DB.Postgres.ConnectionString
c.flags.DBPostgresTable = c.cfg.DB.Postgres.Table
c.flags.DBSQLiteTable = c.cfg.DB.SQLite.Table

// Map API configuration
c.flags.APIEnabled = c.cfg.API.Enabled
c.flags.APIHTTPAddress = c.cfg.API.HTTP.Address
c.flags.APIGRPCAddress = c.cfg.API.GRPC.Address

// Map logging configuration
c.flags.LogLevel = c.cfg.Log.Level
c.flags.LogFormat = c.cfg.Log.Format

// Map pipeline configuration
c.flags.PipelinesExitOnDegraded = c.cfg.Pipelines.ExitOnDegraded
c.flags.PipelinesErrorRecoveryMinDelay = c.cfg.Pipelines.ErrorRecovery.MinDelay
c.flags.PipelinesErrorRecoveryMaxDelay = c.cfg.Pipelines.ErrorRecovery.MaxDelay
c.flags.PipelinesErrorRecoveryBackoffFactor = c.cfg.Pipelines.ErrorRecovery.BackoffFactor
c.flags.PipelinesErrorRecoveryMaxRetries = c.cfg.Pipelines.ErrorRecovery.MaxRetries
c.flags.PipelinesErrorRecoveryMaxRetriesWindow = c.cfg.Pipelines.ErrorRecovery.MaxRetriesWindow

// Map schema registry configuration
c.flags.SchemaRegistryType = c.cfg.SchemaRegistry.Type
c.flags.SchemaRegistryConfluentConnectionString = c.cfg.SchemaRegistry.Confluent.ConnectionString

// Map preview features
c.flags.PreviewPipelineArchV2 = c.cfg.Preview.PipelineArchV2

// Map development profiling
c.flags.DevCPUProfile = c.cfg.Dev.CPUProfile
c.flags.DevMemProfile = c.cfg.Dev.MemProfile
c.flags.DevBlockProfile = c.cfg.Dev.BlockProfile

// Update paths
c.flags.DBSQLitePath = c.cfg.DB.SQLite.Path
c.flags.DBBadgerPath = c.cfg.DB.Badger.Path
c.flags.PipelinesPath = c.cfg.Pipelines.Path
c.flags.ConnectorsPath = c.cfg.Connectors.Path
c.flags.ProcessorsPath = c.cfg.Processors.Path
}

func (c *RootCommand) Execute(ctx context.Context) error {
if c.flags.Version {
// TODO: use the logger instead
fmt.Print(conduit.Version(true))
_, _ = fmt.Fprintf(os.Stdout, "%s\n", conduit.Version(true))
return nil
}

// 1. Load conduit configuration file and update general config.
if err := internal.LoadConfigFromFile(c.flags.ConduitConfigPath, &c.cfg); err != nil {
return err
}

// 2. Load environment variables and update general config.
if err := internal.LoadConfigFromEnv(&c.cfg); err != nil {
return err
}

// 3. Update the general config from flags.
c.updateConfigFromFlags()

// 4. Update flags from global configuration (this will be needed for conduit init)
c.updateFlagValuesFromConfig()

e := &conduit.Entrypoint{}
e.Serve(c.cfg)

return nil
}

func (c *RootCommand) Usage() string { return "conduit" }
func (c *RootCommand) Flags() []ecdysis.Flag {
flags := ecdysis.BuildFlags(&c.flags)

c.cfg = conduit.DefaultConfig()
currentPath, err := os.Getwd()
if err != nil {
panic(cerrors.Errorf("failed to get current working directory: %w", err))
}
c.cfg = conduit.DefaultConfigWithBasePath(currentPath)

conduitConfigPath := filepath.Join(currentPath, "conduit.yaml")
flags.SetDefault("config.path", conduitConfigPath)
flags.SetDefault("db.type", c.cfg.DB.Type)
flags.SetDefault("db.badger.path", c.cfg.DB.Badger.Path)
flags.SetDefault("db.postgres.connection-string", c.cfg.DB.Postgres.ConnectionString)
Expand Down Expand Up @@ -138,7 +256,7 @@ func (c *RootCommand) Docs() ecdysis.Docs {

func (c *RootCommand) SubCommands() []ecdysis.Command {
return []ecdysis.Command{
&InitCommand{},
&InitCommand{rootFlags: &c.flags},
&pipelines.PipelinesCommand{},
}
}
Loading

0 comments on commit 00f153b

Please sign in to comment.