Skip to content
This repository has been archived by the owner on Nov 19, 2020. It is now read-only.

Commit

Permalink
refactor lib and parser sub package (#59)
Browse files Browse the repository at this point in the history
* lib and parser refactored to entrypoint
* add codecov.yml
  • Loading branch information
vjeantet authored Nov 22, 2017
1 parent 4bba5c8 commit 1aefd24
Show file tree
Hide file tree
Showing 22 changed files with 532 additions and 523 deletions.
10 changes: 5 additions & 5 deletions api/assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
uuid "github.com/nu7hatch/gouuid"
"github.com/vjeantet/bitfan/core"
"github.com/vjeantet/bitfan/core/models"
"github.com/vjeantet/bitfan/parser"
"github.com/vjeantet/bitfan/entrypoint/parser/logstash"
)

type AssetApiController struct {
Expand All @@ -26,13 +26,13 @@ func (a *AssetApiController) CheckSyntax(c *gin.Context) {
return
}

_, err = parser.NewParser(bytes.NewReader(asset.Value)).Parse()
_, err = logstash.NewParser(bytes.NewReader(asset.Value)).Parse()
if err != nil {
c.JSON(200, gin.H{
"l": err.(*parser.ParseError).Line,
"c": err.(*parser.ParseError).Column,
"l": err.(*logstash.ParseError).Line,
"c": err.(*logstash.ParseError).Column,
"uuid": asset.Uuid,
"m": err.(*parser.ParseError).Reason,
"m": err.(*logstash.ParseError).Reason,
})
} else {
c.JSON(200, gin.H{
Expand Down
42 changes: 39 additions & 3 deletions api/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
uuid "github.com/nu7hatch/gouuid"
"github.com/vjeantet/bitfan/core"
"github.com/vjeantet/bitfan/core/models"
"github.com/vjeantet/bitfan/entrypoint"
"github.com/vjeantet/jodaTime"
)

Expand Down Expand Up @@ -80,7 +81,7 @@ func (p *PipelineApiController) Create(c *gin.Context) {

// Handle optinal Start
if pipeline.Active == true {
err = core.StartPipelineByUUID(pipeline.Uuid)
err = p.startPipelineByUUID(pipeline.Uuid)
if err != nil {
c.JSON(500, models.Error{Message: err.Error()})
return
Expand All @@ -90,6 +91,41 @@ func (p *PipelineApiController) Create(c *gin.Context) {
c.Redirect(302, fmt.Sprintf("/%s/pipelines/%s", p.path, pipeline.Uuid))
}

func (p *PipelineApiController) startPipelineByUUID(UUID string) error {
tPipeline, err := core.Storage().FindOnePipelineByUUID(UUID, true)
if err != nil {
return err
}

entryPointPath, err := core.Storage().PreparePipelineExecutionStage(&tPipeline)
if err != nil {
return err
}

var loc *entrypoint.Entrypoint
loc, err = entrypoint.New(entryPointPath, "", entrypoint.CONTENT_REF)
if err != nil {
return err
}

ppl := loc.ConfigPipeline()
ppl.Name = tPipeline.Label
ppl.Uuid = tPipeline.Uuid

agt, err := loc.ConfigAgents()
if err != nil {
return err
}

nUUID, err := core.StartPipeline(&ppl, agt)
if err != nil {
return err
}

apiLogger.Debugf("Pipeline %s started UUID=%s", tPipeline.Label, nUUID)
return nil
}

func (p *PipelineApiController) Find(c *gin.Context) {

pipelines := core.Storage().FindPipelines(false)
Expand Down Expand Up @@ -162,7 +198,7 @@ func (p *PipelineApiController) UpdateByUUID(c *gin.Context) {
c.JSON(500, models.Error{Message: err.Error()})
return
}
err = core.StartPipelineByUUID(uuid)
err = p.startPipelineByUUID(uuid)
if err != nil {
c.JSON(500, models.Error{Message: err.Error()})
return
Expand All @@ -179,7 +215,7 @@ func (p *PipelineApiController) UpdateByUUID(c *gin.Context) {
switch nextActive {
case true: // start pipeline
apiLogger.Debugf("starting pipeline %s", uuid)
err := core.StartPipelineByUUID(uuid)
err := p.startPipelineByUUID(uuid)
if err != nil {
c.JSON(500, models.Error{Message: err.Error()})
return
Expand Down
74 changes: 39 additions & 35 deletions cmd/bitfan/commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"github.com/vjeantet/bitfan/api"
"github.com/vjeantet/bitfan/core"
"github.com/vjeantet/bitfan/lib"
"github.com/vjeantet/bitfan/entrypoint"
)

func init() {
Expand Down Expand Up @@ -63,68 +63,75 @@ When no configuration is passed to the command, bitfan use the config set in glo
}
}

// AutoStart pipelines only when no configuration given as command line args
core.Start(opt)
core.Log().Infoln("bitfan ready")

// Start Pipelines

// Prepare entrypoints
var entrypoints entrypoint.EntrypointList
// From Storage when len == 0
if len(args) == 0 {
opt.AutoStart = true
}
pipelinesToStart := core.Storage().FindPipelinesWithAutoStart(true)
for _, p := range pipelinesToStart {
entryPointPath, err := core.Storage().PreparePipelineExecutionStage(&p)
if err != nil {
core.Log().Fatalln(err)
}

core.Start(opt)
var loc *entrypoint.Entrypoint
loc, err = entrypoint.New(entryPointPath, "", entrypoint.CONTENT_REF)
loc.PipelineName = p.Label
loc.PipelineUuid = p.Uuid
if err != nil {
core.Log().Fatalln(err)
}
entrypoints.AddEntrypoint(loc)
}
}

// Start configumation in config or in STDIN
// TODO : Refactor with RunAutoStartPipelines
var locations lib.Locations
// From config when config == 0
cwd, _ := os.Getwd()

if len(args) == 0 {
for _, v := range viper.GetStringSlice("config") {
loc, _ := lib.NewLocation(v, cwd)
locations.AddLocation(loc)
loc, _ := entrypoint.New(v, cwd, entrypoint.CONTENT_REF)
entrypoints.AddEntrypoint(loc)
}
} else {
}

// From args when config > 0
if len(args) > 0 {
for _, v := range args {
var loc *lib.Location
var loc *entrypoint.Entrypoint
var err error
loc, err = lib.NewLocation(v, cwd)
loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_REF)
if err != nil {
// is a content ?
loc, err = lib.NewLocationContent(v, cwd)
loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_INLINE)
if err != nil {
return
core.Log().Fatalln(err)
}
}

locations.AddLocation(loc)
entrypoints.AddEntrypoint(loc)
}
}

for _, loc := range locations.Items {
for _, loc := range entrypoints.Items {
agt, err := loc.ConfigAgents()

if err != nil {
core.Log().Errorf("Error : %s %v", loc.Path, err)
os.Exit(2)
}
ppl := loc.ConfigPipeline()

// Allow pipeline customisation only when only one location was provided by user
if len(locations.Items) == 1 {
if cmd.Flags().Changed("name") {
ppl.Name, _ = cmd.Flags().GetString("name")
}
if cmd.Flags().Changed("id") {
ppl.Uuid, _ = cmd.Flags().GetString("uuid")
}
}

_, err = core.StartPipeline(&ppl, agt)
if err != nil {
core.Log().Errorf("error: %v", err)
os.Exit(1)
}
core.Log().Infof("Pipeline started %s (%s)", ppl.Name, ppl.Uuid)
}

core.Log().Infoln("bitfan ready")

if service.Interactive() {
// Wait for signal CTRL+C for send a stop event to all AgentProcessor
// When CTRL+C, SIGINT and SIGTERM signal occurs
Expand Down Expand Up @@ -158,11 +165,8 @@ func initRunFlags(cmd *cobra.Command) {
cmd.Flags().StringP("host", "H", "127.0.0.1:5123", "Service Host to connect to")

cmd.Flags().Bool("no-network", false, "Disable network (api and webhook)")
cmd.Flags().String("name", "", "set pipeline's name")
cmd.Flags().String("uuid", "", "set pipeline's uuid")
cwd, _ := os.Getwd()
cmd.Flags().String("data", filepath.Join(cwd, ".bitfan"), "Path to data dir")

cmd.Flags().Bool("api", true, "Expose REST Api")
cmd.Flags().Bool("prometheus", false, "Export stats using prometheus output")
cmd.Flags().String("prometheus.path", "/metrics", "Expose Prometheus metrics at specified path.")
Expand Down
14 changes: 7 additions & 7 deletions cmd/bitfan/commands/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/vjeantet/bitfan/core"
config "github.com/vjeantet/bitfan/core/config"
"github.com/vjeantet/bitfan/lib"
"github.com/vjeantet/bitfan/entrypoint"
)

func init() {
Expand All @@ -21,20 +21,20 @@ var testCmd = &cobra.Command{
Short: "Test configurations (files, url, directories)",
Run: func(cmd *cobra.Command, args []string) {

var locations lib.Locations
var locations entrypoint.EntrypointList
cwd, _ := os.Getwd()
for _, v := range args {
var loc *lib.Location
var loc *entrypoint.Entrypoint
var err error
loc, err = lib.NewLocation(v, cwd)
loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_REF)
if err != nil {
loc, err = lib.NewLocationContent(v, cwd)
loc, err = entrypoint.New(v, cwd, entrypoint.CONTENT_INLINE)
if err != nil {
return
}
}

locations.AddLocation(loc)
locations.AddEntrypoint(loc)
}

var cko int
Expand All @@ -57,7 +57,7 @@ var testCmd = &cobra.Command{
},
}

func testConfigContent(loc *lib.Location) error {
func testConfigContent(loc *entrypoint.Entrypoint) error {
configAgents, err := loc.ConfigAgents()
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
coverage:
status:
patch: no
72 changes: 0 additions & 72 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@ package core

import (
"fmt"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"strings"
"time"

"golang.org/x/sync/syncmap"

Expand All @@ -17,9 +14,7 @@ import (
"github.com/justinas/alice"
"github.com/prometheus/client_golang/prometheus"
"github.com/vjeantet/bitfan/core/config"
"github.com/vjeantet/bitfan/core/models"
"github.com/vjeantet/bitfan/core/store"
"github.com/vjeantet/bitfan/lib"
)

var (
Expand Down Expand Up @@ -140,13 +135,6 @@ func Start(opt Options) {
panic(err.Error())
}

if opt.AutoStart {
pipelinesToStart := myStore.FindPipelinesWithAutoStart()
for _, p := range pipelinesToStart {
StartPipelineByUUID(p.Uuid)
}
}

if len(opt.HttpHandlers) > 0 {
opt.HttpHandlers = append(opt.HttpHandlers, webHookServer())

Expand All @@ -156,66 +144,6 @@ func Start(opt Options) {
Log().Debugln("bitfan started")
}

func StartPipelineByUUID(UUID string) error {
tPipeline, err := myStore.FindOnePipelineByUUID(UUID, true)
if err != nil {
return err
}

uidString := fmt.Sprintf("%s_%d", tPipeline.Uuid, time.Now().Unix())

cwd := filepath.Join(dataLocation, "_pipelines", uidString)
Log().Debugf("configuration %s stored to %s", uidString, cwd)
os.MkdirAll(cwd, os.ModePerm)

//Save assets to cwd
for _, asset := range tPipeline.Assets {
dest := filepath.Join(cwd, asset.Name)
dir := filepath.Dir(dest)
os.MkdirAll(dir, os.ModePerm)
if err := ioutil.WriteFile(dest, asset.Value, 07770); err != nil {
return err
}

if asset.Type == models.ASSET_TYPE_ENTRYPOINT {
tPipeline.ConfigLocation = filepath.Join(cwd, asset.Name)
}

if tPipeline.ConfigLocation == "" {
return fmt.Errorf("missing entrypoint for pipeline %s", tPipeline.Uuid)
}

Log().Debugf("configuration %s asset %s stored", uidString, asset.Name)
}

Log().Debugf("configuration %s pipeline %s ready to be loaded", uidString, tPipeline.ConfigLocation)

//TODO : resolve lib.Location dans location.Location

var loc *lib.Location
loc, err = lib.NewLocation(tPipeline.ConfigLocation, cwd)
if err != nil {
return err
}

ppl := loc.ConfigPipeline()
ppl.Name = tPipeline.Label
ppl.Uuid = tPipeline.Uuid

agt, err := loc.ConfigAgents()
if err != nil {
return err
}

nUUID, err := StartPipeline(&ppl, agt)
if err != nil {
return err
}

Log().Debugf("Pipeline %s started UUID=%s", tPipeline.Label, nUUID)
return nil
}

// StartPipeline load all agents form a configPipeline and returns pipeline's ID
func StartPipeline(configPipeline *config.Pipeline, configAgents []config.Agent) (string, error) {
p, err := newPipeline(configPipeline, configAgents)
Expand Down
1 change: 0 additions & 1 deletion core/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package core

type Options struct {
AutoStart bool
Host string
HttpHandlers []fnMux
Debug bool
Expand Down
Loading

0 comments on commit 1aefd24

Please sign in to comment.