From 1aefd2470c8c092d675c552c1b44d30942d14684 Mon Sep 17 00:00:00 2001 From: Valere JEANTET Date: Wed, 22 Nov 2017 21:15:54 +0100 Subject: [PATCH] refactor lib and parser sub package (#59) * lib and parser refactored to entrypoint * add codecov.yml --- api/assets.go | 10 +- api/pipelines.go | 42 ++- cmd/bitfan/commands/run.go | 74 ++--- cmd/bitfan/commands/test.go | 14 +- codecov.yml | 3 + core/core.go | 72 ----- core/options.go | 1 - core/store/pipelines.go | 21 +- core/store/store.go | 48 ++- entrypoint/entrypoint.go | 261 ++++++++++++++++ .../entrypoint_test.go | 2 +- .../parser/logstash}/expression.go | 5 +- .../parser/logstash/expression_lexer.go | 2 +- .../parser/logstash}/expression_test.go | 4 +- .../parser/logstash}/lexer.go | 10 +- .../parser/logstash}/lexerStream.go | 2 +- .../parser/logstash}/parser.go | 124 ++++---- .../parser/logstash}/parser_test.go | 2 +- .../parser/logstash}/token.go | 20 +- {lib => entrypoint/parser}/pipelinebuilder.go | 53 ++-- go.list-dependencies.sh | 2 +- lib/location.go | 283 ------------------ 22 files changed, 532 insertions(+), 523 deletions(-) create mode 100644 codecov.yml create mode 100644 entrypoint/entrypoint.go rename lib/location_test.go => entrypoint/entrypoint_test.go (78%) rename {parser/conditionalexpression => entrypoint/parser/logstash}/expression.go (92%) rename parser/conditionalexpression/lexer.go => entrypoint/parser/logstash/expression_lexer.go (99%) rename {parser/conditionalexpression => entrypoint/parser/logstash}/expression_test.go (97%) rename {parser => entrypoint/parser/logstash}/lexer.go (97%) rename {parser => entrypoint/parser/logstash}/lexerStream.go (98%) rename {parser => entrypoint/parser/logstash}/parser.go (81%) rename {parser => entrypoint/parser/logstash}/parser_test.go (80%) rename {parser => entrypoint/parser/logstash}/token.go (84%) rename {lib => entrypoint/parser}/pipelinebuilder.go (90%) delete mode 100644 lib/location.go diff --git a/api/assets.go b/api/assets.go index d4f33087..dcd69cf8 100644 --- a/api/assets.go +++ b/api/assets.go @@ -9,7 +9,7 @@ import ( uuid "github.com/nu7hatch/gouuid" "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/core/models" - "github.com/vjeantet/bitfan/parser" + "github.com/vjeantet/bitfan/entrypoint/parser/logstash" ) type AssetApiController struct { @@ -26,13 +26,13 @@ func (a *AssetApiController) CheckSyntax(c *gin.Context) { return } - _, err = parser.NewParser(bytes.NewReader(asset.Value)).Parse() + _, err = logstash.NewParser(bytes.NewReader(asset.Value)).Parse() if err != nil { c.JSON(200, gin.H{ - "l": err.(*parser.ParseError).Line, - "c": err.(*parser.ParseError).Column, + "l": err.(*logstash.ParseError).Line, + "c": err.(*logstash.ParseError).Column, "uuid": asset.Uuid, - "m": err.(*parser.ParseError).Reason, + "m": err.(*logstash.ParseError).Reason, }) } else { c.JSON(200, gin.H{ diff --git a/api/pipelines.go b/api/pipelines.go index b5483007..6c7d9d90 100644 --- a/api/pipelines.go +++ b/api/pipelines.go @@ -12,6 +12,7 @@ import ( uuid "github.com/nu7hatch/gouuid" "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/entrypoint" "github.com/vjeantet/jodaTime" ) @@ -80,7 +81,7 @@ func (p *PipelineApiController) Create(c *gin.Context) { // Handle optinal Start if pipeline.Active == true { - err = core.StartPipelineByUUID(pipeline.Uuid) + err = p.startPipelineByUUID(pipeline.Uuid) if err != nil { c.JSON(500, models.Error{Message: err.Error()}) return @@ -90,6 +91,41 @@ func (p *PipelineApiController) Create(c *gin.Context) { c.Redirect(302, fmt.Sprintf("/%s/pipelines/%s", p.path, pipeline.Uuid)) } +func (p *PipelineApiController) startPipelineByUUID(UUID string) error { + tPipeline, err := core.Storage().FindOnePipelineByUUID(UUID, true) + if err != nil { + return err + } + + entryPointPath, err := core.Storage().PreparePipelineExecutionStage(&tPipeline) + if err != nil { + return err + } + + var loc *entrypoint.Entrypoint + loc, err = entrypoint.New(entryPointPath, "", entrypoint.CONTENT_REF) + if err != nil { + return err + } + + ppl := loc.ConfigPipeline() + ppl.Name = tPipeline.Label + ppl.Uuid = tPipeline.Uuid + + agt, err := loc.ConfigAgents() + if err != nil { + return err + } + + nUUID, err := core.StartPipeline(&ppl, agt) + if err != nil { + return err + } + + apiLogger.Debugf("Pipeline %s started UUID=%s", tPipeline.Label, nUUID) + return nil +} + func (p *PipelineApiController) Find(c *gin.Context) { pipelines := core.Storage().FindPipelines(false) @@ -162,7 +198,7 @@ func (p *PipelineApiController) UpdateByUUID(c *gin.Context) { c.JSON(500, models.Error{Message: err.Error()}) return } - err = core.StartPipelineByUUID(uuid) + err = p.startPipelineByUUID(uuid) if err != nil { c.JSON(500, models.Error{Message: err.Error()}) return @@ -179,7 +215,7 @@ func (p *PipelineApiController) UpdateByUUID(c *gin.Context) { switch nextActive { case true: // start pipeline apiLogger.Debugf("starting pipeline %s", uuid) - err := core.StartPipelineByUUID(uuid) + err := p.startPipelineByUUID(uuid) if err != nil { c.JSON(500, models.Error{Message: err.Error()}) return diff --git a/cmd/bitfan/commands/run.go b/cmd/bitfan/commands/run.go index bffe3e2a..81a5d891 100644 --- a/cmd/bitfan/commands/run.go +++ b/cmd/bitfan/commands/run.go @@ -26,7 +26,7 @@ import ( "github.com/vjeantet/bitfan/api" "github.com/vjeantet/bitfan/core" - "github.com/vjeantet/bitfan/lib" + "github.com/vjeantet/bitfan/entrypoint" ) func init() { @@ -63,41 +63,60 @@ When no configuration is passed to the command, bitfan use the config set in glo } } - // AutoStart pipelines only when no configuration given as command line args + core.Start(opt) + core.Log().Infoln("bitfan ready") + + // Start Pipelines + + // Prepare entrypoints + var entrypoints entrypoint.EntrypointList + // From Storage when len == 0 if len(args) == 0 { - opt.AutoStart = true - } + pipelinesToStart := core.Storage().FindPipelinesWithAutoStart(true) + for _, p := range pipelinesToStart { + entryPointPath, err := core.Storage().PreparePipelineExecutionStage(&p) + if err != nil { + core.Log().Fatalln(err) + } - core.Start(opt) + var loc *entrypoint.Entrypoint + loc, err = entrypoint.New(entryPointPath, "", entrypoint.CONTENT_REF) + loc.PipelineName = p.Label + loc.PipelineUuid = p.Uuid + if err != nil { + core.Log().Fatalln(err) + } + entrypoints.AddEntrypoint(loc) + } + } - // Start configumation in config or in STDIN - // TODO : Refactor with RunAutoStartPipelines - var locations lib.Locations + // From config when config == 0 cwd, _ := os.Getwd() - if len(args) == 0 { for _, v := range viper.GetStringSlice("config") { - loc, _ := lib.NewLocation(v, cwd) - locations.AddLocation(loc) + loc, _ := entrypoint.New(v, cwd, entrypoint.CONTENT_REF) + entrypoints.AddEntrypoint(loc) } - } else { + } + + // From args when config > 0 + if len(args) > 0 { for _, v := range args { - var loc *lib.Location + var loc *entrypoint.Entrypoint var err error - loc, err = lib.NewLocation(v, cwd) + loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_REF) if err != nil { // is a content ? - loc, err = lib.NewLocationContent(v, cwd) + loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_INLINE) if err != nil { - return + core.Log().Fatalln(err) } } - - locations.AddLocation(loc) + entrypoints.AddEntrypoint(loc) } } - for _, loc := range locations.Items { + for _, loc := range entrypoints.Items { agt, err := loc.ConfigAgents() if err != nil { @@ -105,26 +124,14 @@ When no configuration is passed to the command, bitfan use the config set in glo os.Exit(2) } ppl := loc.ConfigPipeline() - - // Allow pipeline customisation only when only one location was provided by user - if len(locations.Items) == 1 { - if cmd.Flags().Changed("name") { - ppl.Name, _ = cmd.Flags().GetString("name") - } - if cmd.Flags().Changed("id") { - ppl.Uuid, _ = cmd.Flags().GetString("uuid") - } - } - _, err = core.StartPipeline(&ppl, agt) if err != nil { core.Log().Errorf("error: %v", err) os.Exit(1) } + core.Log().Infof("Pipeline started %s (%s)", ppl.Name, ppl.Uuid) } - core.Log().Infoln("bitfan ready") - if service.Interactive() { // Wait for signal CTRL+C for send a stop event to all AgentProcessor // When CTRL+C, SIGINT and SIGTERM signal occurs @@ -158,11 +165,8 @@ func initRunFlags(cmd *cobra.Command) { cmd.Flags().StringP("host", "H", "127.0.0.1:5123", "Service Host to connect to") cmd.Flags().Bool("no-network", false, "Disable network (api and webhook)") - cmd.Flags().String("name", "", "set pipeline's name") - cmd.Flags().String("uuid", "", "set pipeline's uuid") cwd, _ := os.Getwd() cmd.Flags().String("data", filepath.Join(cwd, ".bitfan"), "Path to data dir") - cmd.Flags().Bool("api", true, "Expose REST Api") cmd.Flags().Bool("prometheus", false, "Export stats using prometheus output") cmd.Flags().String("prometheus.path", "/metrics", "Expose Prometheus metrics at specified path.") diff --git a/cmd/bitfan/commands/test.go b/cmd/bitfan/commands/test.go index 1bf10da4..328ae9b4 100644 --- a/cmd/bitfan/commands/test.go +++ b/cmd/bitfan/commands/test.go @@ -8,7 +8,7 @@ import ( "github.com/vjeantet/bitfan/core" config "github.com/vjeantet/bitfan/core/config" - "github.com/vjeantet/bitfan/lib" + "github.com/vjeantet/bitfan/entrypoint" ) func init() { @@ -21,20 +21,20 @@ var testCmd = &cobra.Command{ Short: "Test configurations (files, url, directories)", Run: func(cmd *cobra.Command, args []string) { - var locations lib.Locations + var locations entrypoint.EntrypointList cwd, _ := os.Getwd() for _, v := range args { - var loc *lib.Location + var loc *entrypoint.Entrypoint var err error - loc, err = lib.NewLocation(v, cwd) + loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_REF) if err != nil { - loc, err = lib.NewLocationContent(v, cwd) + loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_INLINE) if err != nil { return } } - locations.AddLocation(loc) + locations.AddEntrypoint(loc) } var cko int @@ -57,7 +57,7 @@ var testCmd = &cobra.Command{ }, } -func testConfigContent(loc *lib.Location) error { +func testConfigContent(loc *entrypoint.Entrypoint) error { configAgents, err := loc.ConfigAgents() if err != nil { return err diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 00000000..8eb6408d --- /dev/null +++ b/codecov.yml @@ -0,0 +1,3 @@ +coverage: + status: + patch: no diff --git a/core/core.go b/core/core.go index 3447f66a..4a522143 100644 --- a/core/core.go +++ b/core/core.go @@ -2,12 +2,9 @@ package core import ( "fmt" - "io/ioutil" "net/http" "os" - "path/filepath" "strings" - "time" "golang.org/x/sync/syncmap" @@ -17,9 +14,7 @@ import ( "github.com/justinas/alice" "github.com/prometheus/client_golang/prometheus" "github.com/vjeantet/bitfan/core/config" - "github.com/vjeantet/bitfan/core/models" "github.com/vjeantet/bitfan/core/store" - "github.com/vjeantet/bitfan/lib" ) var ( @@ -140,13 +135,6 @@ func Start(opt Options) { panic(err.Error()) } - if opt.AutoStart { - pipelinesToStart := myStore.FindPipelinesWithAutoStart() - for _, p := range pipelinesToStart { - StartPipelineByUUID(p.Uuid) - } - } - if len(opt.HttpHandlers) > 0 { opt.HttpHandlers = append(opt.HttpHandlers, webHookServer()) @@ -156,66 +144,6 @@ func Start(opt Options) { Log().Debugln("bitfan started") } -func StartPipelineByUUID(UUID string) error { - tPipeline, err := myStore.FindOnePipelineByUUID(UUID, true) - if err != nil { - return err - } - - uidString := fmt.Sprintf("%s_%d", tPipeline.Uuid, time.Now().Unix()) - - cwd := filepath.Join(dataLocation, "_pipelines", uidString) - Log().Debugf("configuration %s stored to %s", uidString, cwd) - os.MkdirAll(cwd, os.ModePerm) - - //Save assets to cwd - for _, asset := range tPipeline.Assets { - dest := filepath.Join(cwd, asset.Name) - dir := filepath.Dir(dest) - os.MkdirAll(dir, os.ModePerm) - if err := ioutil.WriteFile(dest, asset.Value, 07770); err != nil { - return err - } - - if asset.Type == models.ASSET_TYPE_ENTRYPOINT { - tPipeline.ConfigLocation = filepath.Join(cwd, asset.Name) - } - - if tPipeline.ConfigLocation == "" { - return fmt.Errorf("missing entrypoint for pipeline %s", tPipeline.Uuid) - } - - Log().Debugf("configuration %s asset %s stored", uidString, asset.Name) - } - - Log().Debugf("configuration %s pipeline %s ready to be loaded", uidString, tPipeline.ConfigLocation) - - //TODO : resolve lib.Location dans location.Location - - var loc *lib.Location - loc, err = lib.NewLocation(tPipeline.ConfigLocation, cwd) - if err != nil { - return err - } - - ppl := loc.ConfigPipeline() - ppl.Name = tPipeline.Label - ppl.Uuid = tPipeline.Uuid - - agt, err := loc.ConfigAgents() - if err != nil { - return err - } - - nUUID, err := StartPipeline(&ppl, agt) - if err != nil { - return err - } - - Log().Debugf("Pipeline %s started UUID=%s", tPipeline.Label, nUUID) - return nil -} - // StartPipeline load all agents form a configPipeline and returns pipeline's ID func StartPipeline(configPipeline *config.Pipeline, configAgents []config.Agent) (string, error) { p, err := newPipeline(configPipeline, configAgents) diff --git a/core/options.go b/core/options.go index f3e0aeb9..7ce68bdd 100644 --- a/core/options.go +++ b/core/options.go @@ -1,7 +1,6 @@ package core type Options struct { - AutoStart bool Host string HttpHandlers []fnMux Debug bool diff --git a/core/store/pipelines.go b/core/store/pipelines.go index 3dd8d9b3..867e5082 100644 --- a/core/store/pipelines.go +++ b/core/store/pipelines.go @@ -29,7 +29,7 @@ type StoreAssetRef struct { Type string } -func (s *Store) FindPipelinesWithAutoStart() []models.Pipeline { +func (s *Store) FindPipelinesWithAutoStart(withAssetValues bool) []models.Pipeline { pps := []models.Pipeline{} var sps []StorePipeline @@ -50,11 +50,26 @@ func (s *Store) FindPipelinesWithAutoStart() []models.Pipeline { tPipeline.AutoStart = p.AutoStart for _, a := range p.Assets { - tPipeline.Assets = append(tPipeline.Assets, models.Asset{ + asset := models.Asset{ Uuid: a.Uuid, Name: a.Label, Type: a.Type, - }) + } + + if withAssetValues { + var sas []StoreAsset + err := s.db.Find(&sas, bolthold.Where(bolthold.Key).Eq(a.Uuid)) + if err != nil { + return pps + } + if len(sas) == 0 { + return pps + } + asset.Value = sas[0].Value + asset.Size = sas[0].Size + asset.ContentType = sas[0].ContentType + } + tPipeline.Assets = append(tPipeline.Assets, asset) } pps = append(pps, tPipeline) } diff --git a/core/store/store.go b/core/store/store.go index 8c7d1a8f..a637f1cb 100644 --- a/core/store/store.go +++ b/core/store/store.go @@ -1,27 +1,69 @@ package store import ( + "fmt" "io" + "io/ioutil" + "os" "path/filepath" + "time" "github.com/boltdb/bolt" "github.com/timshannon/bolthold" + "github.com/vjeantet/bitfan/core/models" ) type Store struct { - db *bolthold.Store - log Logger + db *bolthold.Store + pipelineTmpPath string + log Logger } func New(location string, log Logger) (*Store, error) { database, err := bolthold.Open(filepath.Join(location, "bitfan.bolt.db"), 0666, nil) - return &Store{db: database, log: log}, err + pipelineTmpPath := filepath.Join(location, "_pipelines") + + return &Store{db: database, log: log, pipelineTmpPath: pipelineTmpPath}, err } func (s *Store) Close() { s.db.Close() } +func (s *Store) PipelineTmpPath(uuid string) string { + uidString := fmt.Sprintf("%s_%d", uuid, time.Now().Unix()) + pipelinePath := filepath.Join(s.pipelineTmpPath, uidString) + os.MkdirAll(pipelinePath, os.ModePerm) + return pipelinePath +} + +func (s *Store) PreparePipelineExecutionStage(tPipeline *models.Pipeline) (string, error) { + //Save assets to cwd + cwd := s.PipelineTmpPath(tPipeline.Uuid) + for _, asset := range tPipeline.Assets { + dest := filepath.Join(cwd, asset.Name) + dir := filepath.Dir(dest) + os.MkdirAll(dir, os.ModePerm) + s.log.Debugf("configuration stored to %s", cwd) + if err := ioutil.WriteFile(dest, asset.Value, 07770); err != nil { + return "", err + } + + if asset.Type == models.ASSET_TYPE_ENTRYPOINT { + tPipeline.ConfigLocation = filepath.Join(cwd, asset.Name) + } + + if tPipeline.ConfigLocation == "" { + return "", fmt.Errorf("missing entrypoint for pipeline %s", tPipeline.Uuid) + } + + s.log.Debugf("configuration %s asset %s stored", tPipeline.Uuid, asset.Name) + } + + s.log.Debugf("configuration %s pipeline %s ready to be loaded", tPipeline.Uuid, tPipeline.ConfigLocation) + return tPipeline.ConfigLocation, nil +} + // CopyTo writes the raw database's content to given io.Writer func (s *Store) CopyTo(w io.Writer) (int, error) { size := 0 diff --git a/entrypoint/entrypoint.go b/entrypoint/entrypoint.go new file mode 100644 index 00000000..5867db7f --- /dev/null +++ b/entrypoint/entrypoint.go @@ -0,0 +1,261 @@ +// Entrypoint manage pipeline's definitions to get Pipeline ready to be used by the core +package entrypoint + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/url" + "os" + "path/filepath" + "regexp" + "strings" + + "github.com/vjeantet/bitfan/core/config" + "github.com/vjeantet/bitfan/entrypoint/parser" +) + +const ( + CONTENT_REF = iota // Content is a reference to something + CONTENT_REF_FS // Content is a reference to something on the filesystem + CONTENT_REF_URL // Content is a reference to something on the web (http, https) + CONTENT_INLINE // Content is a value +) + +// Entrypoint is a the pipeline's definition +type Entrypoint struct { + Path string + Kind int // Kind of content + Workingpath string + Content string + PipelineName string + PipelineUuid string +} + +// List of Entrypoints +type EntrypointList struct { + Items []*Entrypoint +} + +// Create a new entrypoint (pipeline definition) +// +// - contentValue may be a filesystem path, a URL or a string, +// +// - cwl Working Location should be provided to the parser, it could be an filesystem dir, a baseUrl base path, this part is +// used when the entrypoint contains references to other configurations. @see use, route processors. +// +// - contentKind refer to the kind of contentValue @see CONTENT_* constants +func New(contentValue string, cwl string, contentKind int) (*Entrypoint, error) { + loc := &Entrypoint{} + + if contentKind == CONTENT_INLINE { + loc.Kind = CONTENT_INLINE + loc.Content = contentValue + return loc, nil + } + + if v, _ := url.Parse(contentValue); v.Scheme == "http" || v.Scheme == "https" { + loc.Kind = CONTENT_REF_URL + loc.Path = contentValue + } else if _, err := os.Stat(contentValue); err == nil { + var err error + loc.Kind = CONTENT_REF_FS + loc.Path, err = filepath.Abs(contentValue) + if err != nil { + return loc, err + } + } else if _, err := os.Stat(filepath.Join(cwl, contentValue)); err == nil { + loc.Kind = CONTENT_REF_FS + loc.Path = filepath.Join(cwl, contentValue) + } else if v, _ := url.Parse(cwl); v.Scheme == "http" || v.Scheme == "https" { + loc.Kind = CONTENT_REF_URL + loc.Path = cwl + contentValue + } else { + return nil, fmt.Errorf("can not find any configuration contentValue=%s, cwl=%s", contentValue, cwl) + } + + loc.Workingpath = cwl + return loc, nil +} + +// AddEntrypoint add the provided entrypoint to the list +func (e *EntrypointList) AddEntrypoint(loc *Entrypoint) error { + // if it's a file try to expand + if loc.Kind == CONTENT_REF_FS { + subpaths, err := expandFilePath(loc.Path) + if err != nil { + return err + } + if len(subpaths) == 1 { + e.Items = append(e.Items, loc) + } else { + for _, subpath := range subpaths { + subloc := &Entrypoint{ + Path: subpath, + Workingpath: loc.Workingpath, + Kind: loc.Kind, + } + e.Items = append(e.Items, subloc) + } + } + + return nil + } + + e.Items = append(e.Items, loc) + + return nil +} + +// ConfigPipeline returns a core Pipeline from entrypoint's definition +func (e *Entrypoint) ConfigPipeline() config.Pipeline { + var pipeline *config.Pipeline + + switch e.Kind { + case CONTENT_INLINE: + pipeline = config.NewPipeline("inline", "nodescription", "inline") + case CONTENT_REF_URL: + uriSegments := strings.Split(e.Path, "/") + pipelineName := strings.Join(uriSegments[2:], ".") + pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) + case CONTENT_REF_FS: + filename := filepath.Base(e.Path) + extension := filepath.Ext(filename) + pipelineName := filename[0 : len(filename)-len(extension)] + pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) + } + + if e.PipelineName != "" { + pipeline.Name = e.PipelineName + } + if e.PipelineUuid != "" { + pipeline.Uuid = e.PipelineUuid + } + + return *pipeline +} + +// ConfigPipeline returns core agents from entrypoint's definition +func (e *Entrypoint) ConfigAgents() ([]config.Agent, error) { + var agents []config.Agent + var content []byte + var err error + var cwd string + + content, cwd, err = e.content(map[string]interface{}{}) + if err != nil { + return agents, err + } + + agents, err = parser.BuildAgents(content, cwd, entrypointContent) + return agents, err +} + +func entrypointContent(path string, cwl string, options map[string]interface{}) ([]byte, string, error) { + e, err := New(path, cwl, CONTENT_REF) + if err != nil { + return nil, "", err + } + return e.content(options) +} + +func (e *Entrypoint) content(options map[string]interface{}) ([]byte, string, error) { + var content []byte + var cwl string + var err error + + switch e.Kind { + case CONTENT_INLINE: + content = []byte(e.Content) + cwl = e.Workingpath + + case CONTENT_REF_URL: + response, err := http.Get(e.Path) + if err != nil { + return content, cwl, err + } else { + content, err = ioutil.ReadAll(response.Body) + response.Body.Close() + if err != nil { + return content, cwl, err + } + } + + uriSegments := strings.Split(e.Path, "/") + cwl = strings.Join(uriSegments[:len(uriSegments)-1], "/") + "/" + + case CONTENT_REF_FS: + + // relative .Path ? + if false == filepath.IsAbs(e.Path) { + e.Path = filepath.Join(e.Workingpath, e.Path) + } + + content, err = ioutil.ReadFile(e.Path) + if err != nil { + return content, cwl, fmt.Errorf(`Error while reading "%s" [%v]`, e.Path, err) + } + cwl = filepath.Dir(e.Path) + } + + // find ${FOO:default value} and replace with + // var["FOO"] if found + // environnement variaable FOO if env variable exists + // default value, empty when not provided + contentString := string(content) + r, _ := regexp.Compile(`\${([a-zA-Z_\-0-9]+):?([^"'}]*)}`) + envVars := r.FindAllStringSubmatch(contentString, -1) + for _, envVar := range envVars { + varText := envVar[0] + varName := envVar[1] + varDefaultValue := envVar[2] + + if values, ok := options["var"]; ok { + if value, ok := values.(map[string]interface{})[varName]; ok { + contentString = strings.Replace(contentString, varText, value.(string), -1) + continue + } + } + // Lookup for env + if value, found := os.LookupEnv(varName); found { + contentString = strings.Replace(contentString, varText, value, -1) + continue + } + // Set default value + contentString = strings.Replace(contentString, varText, varDefaultValue, -1) + continue + } + content = []byte(contentString) + + return content, cwl, err + +} + +func expandFilePath(path string) ([]string, error) { + locs := []string{} + if fi, err := os.Stat(path); err == nil { + + if false == fi.IsDir() { + locs = append(locs, path) + return locs, nil + } + files, err := filepath.Glob(filepath.Join(path, "*.*")) + if err != nil { + return locs, err + + } + //use each file + for _, file := range files { + switch strings.ToLower(filepath.Ext(file)) { + case ".conf": + locs = append(locs, file) + continue + default: + + } + } + } else { + return locs, fmt.Errorf("%s not found", path) + } + return locs, nil +} diff --git a/lib/location_test.go b/entrypoint/entrypoint_test.go similarity index 78% rename from lib/location_test.go rename to entrypoint/entrypoint_test.go index 3fd09d2d..12a7719d 100644 --- a/lib/location_test.go +++ b/entrypoint/entrypoint_test.go @@ -1,4 +1,4 @@ -package lib +package entrypoint import "testing" diff --git a/parser/conditionalexpression/expression.go b/entrypoint/parser/logstash/expression.go similarity index 92% rename from parser/conditionalexpression/expression.go rename to entrypoint/parser/logstash/expression.go index 43af3bbd..f929bc69 100644 --- a/parser/conditionalexpression/expression.go +++ b/entrypoint/parser/logstash/expression.go @@ -1,4 +1,4 @@ -package conditionalexpression +package logstash import ( "bytes" @@ -9,7 +9,8 @@ import ( "github.com/vjeantet/go-lexer" ) -func ToWhenExpression(lsExpression string) (string, error) { +// returns a bitfan compatible expression from a logstash one +func toWhenExpression(lsExpression string) (string, error) { lsExpression = fixNotInExpr(lsExpression) r := bytes.NewReader([]byte(lsExpression)) l := lexer.New(r, lexBegin) diff --git a/parser/conditionalexpression/lexer.go b/entrypoint/parser/logstash/expression_lexer.go similarity index 99% rename from parser/conditionalexpression/lexer.go rename to entrypoint/parser/logstash/expression_lexer.go index e997dc3c..8032f678 100644 --- a/parser/conditionalexpression/lexer.go +++ b/entrypoint/parser/logstash/expression_lexer.go @@ -1,4 +1,4 @@ -package conditionalexpression +package logstash import ( "strings" diff --git a/parser/conditionalexpression/expression_test.go b/entrypoint/parser/logstash/expression_test.go similarity index 97% rename from parser/conditionalexpression/expression_test.go rename to entrypoint/parser/logstash/expression_test.go index 85d8108f..12f846ef 100644 --- a/parser/conditionalexpression/expression_test.go +++ b/entrypoint/parser/logstash/expression_test.go @@ -1,4 +1,4 @@ -package conditionalexpression +package logstash import ( "testing" @@ -172,7 +172,7 @@ func TestExpressions(t *testing.T) { } func check(t *testing.T, lsExpression string, gvExpression string) { - result, err := ToWhenExpression(lsExpression) + result, err := toWhenExpression(lsExpression) assert.NoError(t, err, "err is not nil") assert.Equal(t, result, gvExpression) } diff --git a/parser/lexer.go b/entrypoint/parser/logstash/lexer.go similarity index 97% rename from parser/lexer.go rename to entrypoint/parser/logstash/lexer.go index a6bd6617..4a884b9d 100644 --- a/parser/lexer.go +++ b/entrypoint/parser/logstash/lexer.go @@ -1,4 +1,4 @@ -package parser +package logstash import ( "bytes" @@ -9,11 +9,11 @@ import ( "unicode" ) -func readToken(stream *lexerStream) (Token, error) { - var ret Token +func readToken(stream *lexerStream) (token, error) { + var ret token var tokenValue interface{} var tokenString string - var kind TokenKind + var kind tokenKind var character rune var completed bool var err error @@ -157,7 +157,7 @@ func readToken(stream *lexerStream) (Token, error) { tokenValue, completed = readUntilFalse(stream, true, false, true, isNotQuoteS(character)) if !completed { - return Token{}, errors.New("Unclosed string literal") + return token{}, errors.New("Unclosed string literal") } // advance the stream one position, since reading until false assumes the terminator is a real token diff --git a/parser/lexerStream.go b/entrypoint/parser/logstash/lexerStream.go similarity index 98% rename from parser/lexerStream.go rename to entrypoint/parser/logstash/lexerStream.go index af0e5653..2d85e59d 100644 --- a/parser/lexerStream.go +++ b/entrypoint/parser/logstash/lexerStream.go @@ -1,4 +1,4 @@ -package parser +package logstash type lexerStream struct { source []rune diff --git a/parser/parser.go b/entrypoint/parser/logstash/parser.go similarity index 81% rename from parser/parser.go rename to entrypoint/parser/logstash/parser.go index 3614bc3e..dab73c3c 100644 --- a/parser/parser.go +++ b/entrypoint/parser/logstash/parser.go @@ -1,14 +1,11 @@ -package parser +package logstash import ( "bytes" "fmt" "io" - "log" "strconv" "strings" - - "github.com/vjeantet/bitfan/parser/conditionalexpression" ) type Parser struct { @@ -55,7 +52,7 @@ type ParseError struct { Reason string } -func NewParseError(l int, c int, message string) *ParseError { +func newParseError(l int, c int, message string) *ParseError { return &ParseError{ Line: l, Column: c, @@ -75,7 +72,7 @@ func NewParser(r io.Reader) *Parser { func (p *Parser) Parse() (*Configuration, error) { var err error - var tok Token + var tok token config := &Configuration{ Sections: map[string]*Section{}, @@ -110,10 +107,10 @@ func (p *Parser) Parse() (*Configuration, error) { return config, err } -func (p *Parser) parseSection(tok *Token) (*Section, error) { +func (p *Parser) parseSection(tok *token) (*Section, error) { section := &Section{} if tok.Value != "input" && tok.Value != "filter" && tok.Value != "output" { - return section, NewParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected '%s', exepected one of 'input', 'filter' or 'output'", tok.Value)) + return section, newParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected '%s', exepected one of 'input', 'filter' or 'output'", tok.Value)) } section.Name = tok.Value.(string) @@ -178,16 +175,16 @@ func (p *Parser) parseSection(tok *Token) (*Section, error) { return section, err } -func (p *Parser) parseWHEN(tok *Token) (*Plugin, error) { +func (p *Parser) parseWHEN(tok *token) (*Plugin, error) { pluginWhen := &Plugin{} pluginWhen.Name = "when" pluginWhen.When = make(map[int]*When) var err error - expression, errc := conditionalexpression.ToWhenExpression(tok.Value.(string)) + expression, errc := toWhenExpression(tok.Value.(string)) if errc != nil { - return pluginWhen, NewParseError(tok.Line, tok.Col, "Conditional expression parse error : "+errc.Error()) + return pluginWhen, newParseError(tok.Line, tok.Col, "Conditional expression parse error : "+errc.Error()) } when := &When{ @@ -256,7 +253,7 @@ func (p *Parser) parseWHEN(tok *Token) (*Plugin, error) { return pluginWhen, err } -func (p *Parser) parsePlugin(tok *Token) (*Plugin, error) { +func (p *Parser) parsePlugin(tok *token) (*Plugin, error) { var err error plugin := &Plugin{} @@ -279,7 +276,7 @@ func (p *Parser) parsePlugin(tok *Token) (*Plugin, error) { i := 0 iCodec := 0 - var advancedTok *Token + var advancedTok *token for { if advancedTok == nil { *tok, err = p.getToken(TokenComment, TokenString, TokenRCurlyBrace, TokenComma) @@ -330,7 +327,7 @@ func (p *Parser) parsePlugin(tok *Token) (*Plugin, error) { return plugin, err } -func (p *Parser) parseCodecSettings(tok *Token) (map[int]*Setting, error) { +func (p *Parser) parseCodecSettings(tok *token) (map[int]*Setting, error) { var err error settings := make(map[int]*Setting) @@ -362,7 +359,7 @@ func (p *Parser) parseCodecSettings(tok *Token) (map[int]*Setting, error) { return settings, err } -func (p *Parser) parseCodec(tok *Token) (*Codec, *Token, error) { +func (p *Parser) parseCodec(tok *token) (*Codec, *token, error) { var err error codec := &Codec{} @@ -417,7 +414,7 @@ func (p *Parser) parseCodec(tok *Token) (*Codec, *Token, error) { return codec, nil, err } -func (p *Parser) parseSetting(tok *Token) (*Setting, error) { +func (p *Parser) parseSetting(tok *token) (*Setting, error) { setting := &Setting{} setting.K = tok.Value.(string) @@ -548,16 +545,16 @@ func (p *Parser) rewindToken() error { return nil } -func (p *Parser) getToken(types ...TokenKind) (Token, error) { +func (p *Parser) getToken(types ...tokenKind) (token, error) { tok, err := readToken(p.l) if err != nil { - return Token{}, NewParseError(tok.Line, tok.Col, fmt.Sprintf("illegal token '%s'", tok.Value)) + return token{}, newParseError(tok.Line, tok.Col, fmt.Sprintf("illegal token '%s'", tok.Value)) } if tok.Kind == TokenIllegal { // log.Printf(" -- %s %s", TokenType(tok.Kind).String(), tok.Value) - return Token{}, NewParseError(tok.Line, tok.Col, fmt.Sprintf("illegal token '%s'", tok.Value)) + return token{}, newParseError(tok.Line, tok.Col, fmt.Sprintf("illegal token '%s'", tok.Value)) } for _, t := range types { @@ -567,55 +564,56 @@ func (p *Parser) getToken(types ...TokenKind) (Token, error) { } if len(types) == 1 { - return tok, NewParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected token '%s', expected '%s' ", tok.Value, GetTokenKindHumanString(types[0]))) + return tok, newParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected token '%s', expected '%s' ", tok.Value, getTokenKindHumanString(types[0]))) } list := make([]string, len(types)) for i, t := range types { - list[i] = GetTokenKindHumanString(t) + list[i] = getTokenKindHumanString(t) } - return tok, NewParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected token '%s', expected one of '%s' ", tok.Value, strings.Join(list, "|"))) + return tok, newParseError(tok.Line, tok.Col, fmt.Sprintf("unexpected token '%s', expected one of '%s' ", tok.Value, strings.Join(list, "|"))) } -func DumpTokens(content []byte) { - var ret []Token - var token Token - var stream *lexerStream - var err error - - stream = newLexerStream(string(content)) - for stream.canRead() { - - token, err = readToken(stream) - - if err != nil { - fmt.Printf("ERROR %v\n", err) - return - } - - if token.Kind == TokenIllegal { - fmt.Printf("ERROR %v\n", err) - color := "\033[93m" - log.Printf("ERROR %4d line %3d:%-2d %s%-20s\033[0m _\033[92m%s\033[0m_", token.Pos, token.Line, token.Col, color, GetTokenKindHumanString(token.Kind), token.Value) - break - } - - // state, err = getLexerStateForToken(token.Kind) - // if err != nil { - // return - // } - color := "\033[93m" - if token.Kind == TokenIf || token.Kind == TokenElseIf || token.Kind == TokenElse { - color = "\033[1m\033[91m" - } - if token.Kind == TokenLBracket || token.Kind == TokenRBracket || token.Kind == TokenRCurlyBrace || token.Kind == TokenLCurlyBrace { - color = "\033[90m" - } - - log.Printf("%4d line %3d:%-2d %s%-20s\033[0m _\033[92m%s\033[0m_", token.Pos, token.Line, token.Col, color, GetTokenKindHumanString(token.Kind), token.Value) - - // append this valid token - ret = append(ret, token) - } -} +// For Dev +// func DumpTokens(content []byte) { +// var ret []token +// var tok token +// var stream *lexerStream +// var err error + +// stream = newLexerStream(string(content)) +// for stream.canRead() { + +// tok, err = readToken(stream) + +// if err != nil { +// fmt.Printf("ERROR %v\n", err) +// return +// } + +// if tok.Kind == TokenIllegal { +// fmt.Printf("ERROR %v\n", err) +// color := "\033[93m" +// log.Printf("ERROR %4d line %3d:%-2d %s%-20s\033[0m _\033[92m%s\033[0m_", tok.Pos, tok.Line, tok.Col, color, getTokenKindHumanString(tok.Kind), tok.Value) +// break +// } + +// // state, err = getLexerStateForToken(tok.Kind) +// // if err != nil { +// // return +// // } +// color := "\033[93m" +// if tok.Kind == TokenIf || tok.Kind == TokenElseIf || tok.Kind == TokenElse { +// color = "\033[1m\033[91m" +// } +// if tok.Kind == TokenLBracket || tok.Kind == TokenRBracket || tok.Kind == TokenRCurlyBrace || tok.Kind == TokenLCurlyBrace { +// color = "\033[90m" +// } + +// log.Printf("%4d line %3d:%-2d %s%-20s\033[0m _\033[92m%s\033[0m_", tok.Pos, tok.Line, tok.Col, color, getTokenKindHumanString(tok.Kind), tok.Value) + +// // append this valid tok +// ret = append(ret, tok) +// } +// } diff --git a/parser/parser_test.go b/entrypoint/parser/logstash/parser_test.go similarity index 80% rename from parser/parser_test.go rename to entrypoint/parser/logstash/parser_test.go index 53e72715..d1e0b365 100644 --- a/parser/parser_test.go +++ b/entrypoint/parser/logstash/parser_test.go @@ -1,4 +1,4 @@ -package parser +package logstash import "testing" diff --git a/parser/token.go b/entrypoint/parser/logstash/token.go similarity index 84% rename from parser/token.go rename to entrypoint/parser/logstash/token.go index 1f41697f..93214e3d 100644 --- a/parser/token.go +++ b/entrypoint/parser/logstash/token.go @@ -1,10 +1,10 @@ -package parser +package logstash import "fmt" // Represents a single parsed token. -type Token struct { - Kind TokenKind +type token struct { + Kind tokenKind Value interface{} Pos int Line int @@ -12,10 +12,10 @@ type Token struct { } // Represents all valid types of tokens that a token can be. -type TokenKind int +type tokenKind int const ( - TokenIllegal TokenKind = iota + 1 + TokenIllegal tokenKind = iota + 1 TokenEOF TokenAssignment TokenLCurlyBrace @@ -32,12 +32,12 @@ const ( TokenBool ) -func (t *Token) String() string { - return fmt.Sprintf("%s '%s'", GetTokenKindString(t.Kind), t.Value) +func (t *token) String() string { + return fmt.Sprintf("%s '%s'", getTokenKindString(t.Kind), t.Value) } -// GetTokenKindString returns a string that describes the given TokenKind. -func GetTokenKindString(kind TokenKind) string { +// GetTokenKindString returns a string that describes the given tokenKind. +func getTokenKindString(kind tokenKind) string { switch kind { @@ -76,7 +76,7 @@ func GetTokenKindString(kind TokenKind) string { return "TokenIllegal" } -func GetTokenKindHumanString(kind TokenKind) string { +func getTokenKindHumanString(kind tokenKind) string { switch kind { diff --git a/lib/pipelinebuilder.go b/entrypoint/parser/pipelinebuilder.go similarity index 90% rename from lib/pipelinebuilder.go rename to entrypoint/parser/pipelinebuilder.go index 8d0e0660..a2370dd2 100644 --- a/lib/pipelinebuilder.go +++ b/entrypoint/parser/pipelinebuilder.go @@ -1,4 +1,4 @@ -package lib +package parser import ( "bytes" @@ -6,24 +6,29 @@ import ( "strconv" "github.com/vjeantet/bitfan/core/config" - "github.com/vjeantet/bitfan/parser" + "github.com/vjeantet/bitfan/entrypoint/parser/logstash" ) -func parseConfigLocation(name string, path string, options map[string]interface{}, pwd string, pickSections ...string) ([]config.Agent, error) { - var locs Locations +var entryPointContent func(string, string,map[string]interface{}) ([]byte, string, error) - if path != "" { - loc, err := NewLocation(path, pwd) - if err != nil { - return nil, err - } - - locs.AddLocation(loc) - } else { +func parseConfigLocation(path string, options map[string]interface{}, pwd string, pickSections ...string) ([]config.Agent, error) { + if path == "" { return []config.Agent{}, fmt.Errorf("no location provided to get content from ; options=%v ", options) } - return locs.Items[0].configAgentsWithOptions(options, pickSections...) + content, cwd, err := entryPointContent(path, pwd,options) + + if err != nil { + return nil, err + } + + agents, err := buildAgents(content, cwd, pickSections...) + return agents, err +} + +func BuildAgents(content []byte, pwd string, contentProvider func(string, string, map[string]interface{}) ([]byte, string, error)) ([]config.Agent, error) { + entryPointContent = contentProvider + return buildAgents(content, pwd) } func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.Agent, error) { @@ -33,7 +38,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A pickSections = []string{"input", "filter", "output"} } - p := parser.NewParser(bytes.NewReader(content)) + p := logstash.NewParser(bytes.NewReader(content)) LSConfiguration, err := p.Parse() @@ -91,7 +96,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A } // TODO : this should return ports to be able to use multiple path use -func buildInputAgents(plugin *parser.Plugin, pwd string) ([]config.Agent, []config.Port, error) { +func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []config.Port, error) { var agent config.Agent agent = config.NewAgent() @@ -138,7 +143,7 @@ func buildInputAgents(plugin *parser.Plugin, pwd string) ([]config.Agent, []conf switch v.(type) { case string: agent.Options["path"] = []string{v.(string)} - fileConfigAgents, err := parseConfigLocation("", v.(string), agent.Options, pwd, "input", "filter") + fileConfigAgents, err := parseConfigLocation(v.(string), agent.Options, pwd, "input", "filter") if err != nil { return nil, nil, err } @@ -155,7 +160,7 @@ func buildInputAgents(plugin *parser.Plugin, pwd string) ([]config.Agent, []conf newOutPorts := []config.Port{} for _, p := range v.([]interface{}) { // contruire le pipeline a - fileConfigAgents, err := parseConfigLocation("", p.(string), agent.Options, pwd, "input", "filter") + fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "input", "filter") if err != nil { return nil, nil, err } @@ -220,7 +225,7 @@ func buildInputAgents(plugin *parser.Plugin, pwd string) ([]config.Agent, []conf return []config.Agent{agent}, []config.Port{outPort}, nil } -func buildOutputAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, error) { +func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, error) { agent_list := []config.Agent{} var agent config.Agent @@ -273,7 +278,7 @@ func buildOutputAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st switch v.(type) { case string: agent.Options["path"] = []string{v.(string)} - fileConfigAgents, err := parseConfigLocation("", v.(string), agent.Options, pwd, "filter", "output") + fileConfigAgents, err := parseConfigLocation(v.(string), agent.Options, pwd, "filter", "output") if err != nil { return nil, err } @@ -290,7 +295,7 @@ func buildOutputAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st case []interface{}: CombinedFileConfigAgents := []config.Agent{} for _, p := range v.([]interface{}) { - fileConfigAgents, err := parseConfigLocation("", p.(string), agent.Options, pwd, "filter", "output") + fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter", "output") if err != nil { return nil, err } @@ -368,7 +373,7 @@ func buildOutputAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st return agent_list, nil } -func buildFilterAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, []config.Port, error) { +func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, []config.Port, error) { agent_list := []config.Agent{} @@ -419,7 +424,7 @@ func buildFilterAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st switch v.(type) { case string: agent.Options["path"] = []string{v.(string)} - fileConfigAgents, err := parseConfigLocation("", v.(string), agent.Options, pwd, "filter") + fileConfigAgents, err := parseConfigLocation(v.(string), agent.Options, pwd, "filter") if err != nil { return nil, nil, err } @@ -440,7 +445,7 @@ func buildFilterAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st newOutPorts := []config.Port{} for _, p := range v.([]interface{}) { // contruire le pipeline a - fileConfigAgents, err := parseConfigLocation("", p.(string), agent.Options, pwd, "filter") + fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter") if err != nil { return nil, nil, err } @@ -471,7 +476,7 @@ func buildFilterAgents(plugin *parser.Plugin, lastOutPorts []config.Port, pwd st if plugin.Name == "route" { CombinedFileConfigAgents := []config.Agent{} for _, p := range agent.Options["path"].([]interface{}) { - fileConfigAgents, err := parseConfigLocation("", p.(string), agent.Options, pwd, "filter", "output") + fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter", "output") if err != nil { return nil, nil, err } diff --git a/go.list-dependencies.sh b/go.list-dependencies.sh index f10f724e..7777eb22 100755 --- a/go.list-dependencies.sh +++ b/go.list-dependencies.sh @@ -4,4 +4,4 @@ go list -f ' {{$dir := ""}} {{range $imp := .Deps}} {{printf "%s %s\n" $imp $dir}} -{{end}}' ./... | sort | uniq | grep "\." \ No newline at end of file +{{end}}' $1 | sort | uniq | grep "\." \ No newline at end of file diff --git a/lib/location.go b/lib/location.go deleted file mode 100644 index d26a8c7c..00000000 --- a/lib/location.go +++ /dev/null @@ -1,283 +0,0 @@ -package lib - -import ( - "encoding/base64" - "fmt" - "io/ioutil" - "net/http" - "net/url" - "os" - "path/filepath" - "regexp" - "strings" - - "github.com/vjeantet/bitfan/core/config" -) - -const ( - CONTENT_FS = iota + 1 - CONTENT_URL - CONTENT_INLINE -) - -type Location struct { - Path string - Kind int - Workingpath string - Content string -} - -type Locations struct { - Items []*Location -} - -func NewLocationContent(content string, cwl string) (*Location, error) { - loc := &Location{ - Kind: CONTENT_INLINE, - Content: content, - } - return loc, nil -} - -func NewLocation(ref string, cwl string) (*Location, error) { - loc := &Location{} - - if v, _ := url.Parse(ref); v.Scheme == "http" || v.Scheme == "https" { - loc.Kind = CONTENT_URL - loc.Path = ref - } else if _, err := os.Stat(ref); err == nil { - var err error - loc.Kind = CONTENT_FS - loc.Path, err = filepath.Abs(ref) - if err != nil { - return loc, err - } - } else if _, err := os.Stat(filepath.Join(cwl, ref)); err == nil { - loc.Kind = CONTENT_FS - loc.Path = filepath.Join(cwl, ref) - } else if v, _ := url.Parse(cwl); v.Scheme == "http" || v.Scheme == "https" { - loc.Kind = CONTENT_URL - loc.Path = cwl + ref - } else { - return nil, fmt.Errorf("can not find any configuration ref=%s, cwl=%s", ref, cwl) - } - - loc.Workingpath = cwl - return loc, nil -} -func (l *Locations) AddLocation(loc *Location) error { - - // if it's a file try to expand - if loc.Kind == CONTENT_FS { - subpaths, err := expandFilePath(loc.Path) - if err != nil { - return err - } - - for _, subpath := range subpaths { - subloc := &Location{ - Path: subpath, - Workingpath: loc.Workingpath, - Kind: loc.Kind, - } - l.Items = append(l.Items, subloc) - } - } else { - l.Items = append(l.Items, loc) - } - - return nil -} - -func (l *Location) ConfigPipeline() config.Pipeline { - var pipeline *config.Pipeline - - switch l.Kind { - case CONTENT_INLINE: - pipeline = config.NewPipeline("inline", "nodescription", "inline") - case CONTENT_URL: - uriSegments := strings.Split(l.Path, "/") - pipelineName := strings.Join(uriSegments[2:], ".") - pipeline = config.NewPipeline(pipelineName, "nodescription", l.Path) - case CONTENT_FS: - filename := filepath.Base(l.Path) - extension := filepath.Ext(filename) - pipelineName := filename[0 : len(filename)-len(extension)] - pipeline = config.NewPipeline(pipelineName, "nodescription", l.Path) - } - - return *pipeline -} - -func (l *Location) ConfigAgents() ([]config.Agent, error) { - return l.configAgentsWithOptions(map[string]interface{}{}, "input", "filter", "output") -} - -func (l *Location) configAgentsWithOptions(options map[string]interface{}, pickSections ...string) ([]config.Agent, error) { - var agents []config.Agent - var content []byte - var err error - var cwd string - content, cwd, err = l.content(options) - if err != nil { - return agents, err - } - - agents, err = buildAgents(content, cwd, pickSections...) - return agents, err -} - -// AssetsContent return a map of all files in folder named like the configuration file -// simple.conf -> simple/ -func (l *Location) AssetsContent() map[string]string { - assets := map[string]string{} - - if l.Kind != CONTENT_FS { - return assets - } - - b64c, err := b64EncodeFilePath(l.Path) - if err != nil { - fmt.Printf("location Asset Error %v\n", err) - return nil - } - relativePath, _ := filepath.Rel(filepath.Dir(l.Path), l.Path) - assets[relativePath] = b64c - - filename := filepath.Base(l.Path) - extension := filepath.Ext(filename) - confName := filename[0 : len(filename)-len(extension)] - confDir := filepath.Join(filepath.Dir(l.Path), confName) - - err = filepath.Walk(confDir, func(path string, f os.FileInfo, err error) error { - if path != confDir && filepath.Base(path) != ".DS_Store" && !IsDirectory(path) { - b64c, err := b64EncodeFilePath(path) - if err != nil { - fmt.Printf("location Asset Error %v\n", err) - return nil - } - relativePath, _ := filepath.Rel(filepath.Dir(l.Path), path) - assets[relativePath] = b64c - } - return nil - }) - - if err != nil { - fmt.Printf("AssetsContent error - %v\n", err) - } - - return assets -} -func IsDirectory(path string) bool { - fileInfo, _ := os.Stat(path) - return fileInfo.IsDir() -} -func b64EncodeFilePath(path string) (string, error) { - buff, err := ioutil.ReadFile(path) - if err != nil { - return "", err - } - - return base64.StdEncoding.EncodeToString(buff), nil -} - -func (l *Location) content(options map[string]interface{}) ([]byte, string, error) { - var content []byte - var cwl string - var err error - - switch l.Kind { - case CONTENT_INLINE: - content = []byte(l.Content) - cwl = l.Workingpath - - case CONTENT_URL: - response, err := http.Get(l.Path) - if err != nil { - return content, cwl, err - } else { - content, err = ioutil.ReadAll(response.Body) - response.Body.Close() - if err != nil { - return content, cwl, err - } - } - - uriSegments := strings.Split(l.Path, "/") - cwl = strings.Join(uriSegments[:len(uriSegments)-1], "/") + "/" - - case CONTENT_FS: - - // si location est relatif - if false == filepath.IsAbs(l.Path) { - l.Path = filepath.Join(l.Workingpath, l.Path) - } - - content, err = ioutil.ReadFile(l.Path) - if err != nil { - return content, cwl, fmt.Errorf(`Error while reading "%s" [%v]`, l.Path, err) - } - cwl = filepath.Dir(l.Path) - } - - // find ${FOO:default value} and replace with - // var["FOO"] if found - // environnement variaable FOO if env variable exists - // default value, empty when not provided - contentString := string(content) - r, _ := regexp.Compile(`\${([a-zA-Z_\-0-9]+):?([^"'}]*)}`) - envVars := r.FindAllStringSubmatch(contentString, -1) - for _, envVar := range envVars { - varText := envVar[0] - varName := envVar[1] - varDefaultValue := envVar[2] - - if values, ok := options["var"]; ok { - if value, ok := values.(map[string]interface{})[varName]; ok { - contentString = strings.Replace(contentString, varText, value.(string), -1) - continue - } - } - // Lookup for env - if value, found := os.LookupEnv(varName); found { - contentString = strings.Replace(contentString, varText, value, -1) - continue - } - // Set default value - contentString = strings.Replace(contentString, varText, varDefaultValue, -1) - continue - } - content = []byte(contentString) - - return content, cwl, err - -} - -func expandFilePath(path string) ([]string, error) { - locs := []string{} - if fi, err := os.Stat(path); err == nil { - - if false == fi.IsDir() { - locs = append(locs, path) - return locs, nil - } - files, err := filepath.Glob(filepath.Join(path, "*.*")) - if err != nil { - return locs, err - - } - //use each file - for _, file := range files { - switch strings.ToLower(filepath.Ext(file)) { - case ".conf": - locs = append(locs, file) - continue - default: - - } - } - } else { - return locs, fmt.Errorf("%s not found", path) - } - return locs, nil -}