From 461799b73c89d693693dd5cf127326b075e10ead Mon Sep 17 00:00:00 2001 From: Valere JEANTET Date: Sat, 25 Nov 2017 01:57:25 +0100 Subject: [PATCH] merge refactoring add a commons package for templating and Logger interface core metrics, webhook and memory have their own pakage move store, models, outside of core --- api/assets.go | 2 +- api/client/client.go | 2 +- {core => api}/models/asset.go | 0 {core => api}/models/error.go | 0 {core => api}/models/models_test.go | 0 {core => api}/models/pipeline.go | 0 {core => api}/models/processor_doc.go | 0 api/pipelines.go | 13 +- api/swagger.json | 8 +- cmd/bitfan/commands/run.go | 31 ++- cmd/bitfan/commands/test.go | 23 ++- cmd/bitfanUI/server/server.go | 2 +- codecs/codec.go | 8 +- codecs/csv/decoder.go | 6 +- codecs/csv/decoder_test.go | 4 +- codecs/decoder.go | 4 +- codecs/encoder.go | 4 +- codecs/json/decoder.go | 6 +- codecs/json/encoder.go | 4 +- codecs/jsonlines/decoder.go | 6 +- codecs/lib/log_test.go | 7 - codecs/line/decoder.go | 6 +- codecs/line/encoder.go | 9 +- codecs/multiline/decoder.go | 6 +- codecs/plain/decoder.go | 6 +- codecs/plain/encoder.go | 9 +- codecs/rubydebug/encoder.go | 6 +- codecs/w3c/decoder.go | 6 +- codecs/w3c/decoder_test.go | 8 +- {core/location => commons}/location.go | 2 +- {core/location => commons}/location_test.go | 2 +- {codecs/lib => commons}/log.go | 2 +- {core/location => commons}/templatefuncs.go | 2 +- core/agent.go | 173 ++++++++--------- core/{config => }/codec.go | 14 +- core/config/.gitignore | 24 --- core/config/LICENSE | 201 -------------------- core/config/agent_test.go | 7 - core/config/pipeline.go | 33 ---- core/core.go | 89 ++++----- core/{config => }/graph.go | 2 +- core/{ => memory}/memory.go | 28 +-- core/metrics.go | 25 --- core/metrics/metrics.go | 29 +++ core/{ => metrics}/metrics_prometheus.go | 35 ++-- core/options.go | 10 - core/pipeline.go | 87 +++++---- core/{config => }/port.go | 2 +- core/processor.go | 5 - core/processor_context.go | 42 ++++ core/store/log.go | 34 ---- core/{config/agent.go => util.go} | 41 +--- core/{ => webhook}/webhook.go | 33 +++- entrypoint/entrypoint.go | 71 +++++-- entrypoint/parser/pipelinebuilder.go | 128 ++++++------- processors/base.go | 23 ++- processors/codec.go | 7 + processors/commonoptions.go | 20 +- processors/field.go | 24 --- processors/filter-eval/eval.go | 4 +- processors/httppoller/httppoller.go | 8 +- processors/input-exec/execinput.go | 14 +- processors/input-file/file.go | 9 +- processors/input-httpserver/httpserver.go | 2 +- processors/input-stdin/stdin.go | 4 +- processors/input-tail/tail.go | 24 ++- processors/output-email/email.go | 8 +- processors/sql/sql.go | 4 +- processors/stop/stop.go | 7 +- processors/template/template.go | 4 +- {core/store => store}/assets.go | 2 +- {core/store => store}/pipelines.go | 2 +- {core/store => store}/processors.go | 0 {core/store => store}/store.go | 7 +- 74 files changed, 622 insertions(+), 858 deletions(-) rename {core => api}/models/asset.go (100%) rename {core => api}/models/error.go (100%) rename {core => api}/models/models_test.go (100%) rename {core => api}/models/pipeline.go (100%) rename {core => api}/models/processor_doc.go (100%) delete mode 100644 codecs/lib/log_test.go rename {core/location => commons}/location.go (99%) rename {core/location => commons}/location_test.go (80%) rename {codecs/lib => commons}/log.go (98%) rename {core/location => commons}/templatefuncs.go (99%) rename core/{config => }/codec.go (59%) delete mode 100644 core/config/.gitignore delete mode 100644 core/config/LICENSE delete mode 100644 core/config/agent_test.go delete mode 100644 core/config/pipeline.go rename core/{config => }/graph.go (98%) rename core/{ => memory}/memory.go (52%) delete mode 100644 core/metrics.go create mode 100644 core/metrics/metrics.go rename core/{ => metrics}/metrics_prometheus.go (75%) delete mode 100644 core/options.go rename core/{config => }/port.go (96%) delete mode 100644 core/processor.go create mode 100644 core/processor_context.go delete mode 100644 core/store/log.go rename core/{config/agent.go => util.go} (59%) rename core/{ => webhook}/webhook.go (65%) create mode 100644 processors/codec.go rename {core/store => store}/assets.go (98%) rename {core/store => store}/pipelines.go (99%) rename {core/store => store}/processors.go (100%) rename {core/store => store}/store.go (91%) diff --git a/api/assets.go b/api/assets.go index dcd69cf8..cf54427e 100644 --- a/api/assets.go +++ b/api/assets.go @@ -8,7 +8,7 @@ import ( "github.com/gin-gonic/gin" uuid "github.com/nu7hatch/gouuid" "github.com/vjeantet/bitfan/core" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" "github.com/vjeantet/bitfan/entrypoint/parser/logstash" ) diff --git a/api/client/client.go b/api/client/client.go index e5858425..3cc85d31 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/dghubble/sling" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" ) type RestClient struct { diff --git a/core/models/asset.go b/api/models/asset.go similarity index 100% rename from core/models/asset.go rename to api/models/asset.go diff --git a/core/models/error.go b/api/models/error.go similarity index 100% rename from core/models/error.go rename to api/models/error.go diff --git a/core/models/models_test.go b/api/models/models_test.go similarity index 100% rename from core/models/models_test.go rename to api/models/models_test.go diff --git a/core/models/pipeline.go b/api/models/pipeline.go similarity index 100% rename from core/models/pipeline.go rename to api/models/pipeline.go diff --git a/core/models/processor_doc.go b/api/models/processor_doc.go similarity index 100% rename from core/models/processor_doc.go rename to api/models/processor_doc.go diff --git a/api/pipelines.go b/api/pipelines.go index 6c7d9d90..fa9362f9 100644 --- a/api/pipelines.go +++ b/api/pipelines.go @@ -11,7 +11,7 @@ import ( "github.com/mitchellh/mapstructure" uuid "github.com/nu7hatch/gouuid" "github.com/vjeantet/bitfan/core" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" "github.com/vjeantet/bitfan/entrypoint" "github.com/vjeantet/jodaTime" ) @@ -108,16 +108,15 @@ func (p *PipelineApiController) startPipelineByUUID(UUID string) error { return err } - ppl := loc.ConfigPipeline() - ppl.Name = tPipeline.Label - ppl.Uuid = tPipeline.Uuid - - agt, err := loc.ConfigAgents() + ppl, err := loc.Pipeline() if err != nil { return err } - nUUID, err := core.StartPipeline(&ppl, agt) + ppl.Label = tPipeline.Label + ppl.Uuid = tPipeline.Uuid + + nUUID, err := ppl.Start() if err != nil { return err } diff --git a/api/swagger.json b/api/swagger.json index 3fd70a28..8c30c669 100644 --- a/api/swagger.json +++ b/api/swagger.json @@ -35,7 +35,7 @@ "x-go-name": "Uuid" } }, - "x-go-package": "github.com/vjeantet/bitfan/core/models" + "x-go-package": "github.com/vjeantet/bitfan/api/models" }, "Error": { "description": "A Error is ....\n\nA Error can have.....", @@ -47,7 +47,7 @@ "x-go-name": "Message" } }, - "x-go-package": "github.com/vjeantet/bitfan/core/models" + "x-go-package": "github.com/vjeantet/bitfan/api/models" }, "Pipeline": { "description": "A Pipeline is ....\n\nA Pipeline can have.....", @@ -100,7 +100,7 @@ "x-go-name": "Uuid" } }, - "x-go-package": "github.com/vjeantet/bitfan/core/models" + "x-go-package": "github.com/vjeantet/bitfan/api/models" }, "processorDoc": { "description": "A Doc is ....\n\nA Doc can have.....", @@ -198,7 +198,7 @@ "x-go-name": "Ports" } }, - "x-go-package": "github.com/vjeantet/bitfan/core/models" + "x-go-package": "github.com/vjeantet/bitfan/api/models" } } } \ No newline at end of file diff --git a/cmd/bitfan/commands/run.go b/cmd/bitfan/commands/run.go index 81a5d891..ff869d04 100644 --- a/cmd/bitfan/commands/run.go +++ b/cmd/bitfan/commands/run.go @@ -58,8 +58,9 @@ When no configuration is passed to the command, bitfan use the config set in glo if !viper.GetBool("no-network") { opt.HttpHandlers = append(opt.HttpHandlers, core.HTTPHandler("/api/v2/", api.Handler("api/v2"))) + if viper.IsSet("prometheus") { - opt.HttpHandlers = append(opt.HttpHandlers, core.PrometheusServer(viper.GetString("prometheus.path"))) + opt.Prometheus = viper.GetString("prometheus.path") } } @@ -116,20 +117,32 @@ When no configuration is passed to the command, bitfan use the config set in glo } } - for _, loc := range entrypoints.Items { - agt, err := loc.ConfigAgents() - + for _, ep := range entrypoints.Items { + ppl, err := ep.Pipeline() if err != nil { - core.Log().Errorf("Error : %s %v", loc.Path, err) - os.Exit(2) + core.Log().Fatalln(err) } - ppl := loc.ConfigPipeline() - _, err = core.StartPipeline(&ppl, agt) + + nUUID, err := ppl.Start() if err != nil { core.Log().Errorf("error: %v", err) os.Exit(1) } - core.Log().Infof("Pipeline started %s (%s)", ppl.Name, ppl.Uuid) + core.Log().Infof("Pipeline started %s (%s)(%s)", ppl.Label, ppl.Uuid, nUUID) + + // agt, err := ep.ConfigAgents() + + // if err != nil { + // core.Log().Errorf("Error : %s %v", ep.Path, err) + // os.Exit(2) + // } + // ppl := ep.ConfigPipeline() + // _, err = core.StartPipeline(&ppl, agt) + // if err != nil { + // core.Log().Errorf("error: %v", err) + // os.Exit(1) + // } + // core.Log().Infof("Pipeline started %s (%s)", ppl.Name, ppl.Uuid) } if service.Interactive() { diff --git a/cmd/bitfan/commands/test.go b/cmd/bitfan/commands/test.go index 328ae9b4..8722a3c1 100644 --- a/cmd/bitfan/commands/test.go +++ b/cmd/bitfan/commands/test.go @@ -6,8 +6,6 @@ import ( "github.com/spf13/cobra" - "github.com/vjeantet/bitfan/core" - config "github.com/vjeantet/bitfan/core/config" "github.com/vjeantet/bitfan/entrypoint" ) @@ -58,17 +56,18 @@ var testCmd = &cobra.Command{ } func testConfigContent(loc *entrypoint.Entrypoint) error { - configAgents, err := loc.ConfigAgents() - if err != nil { - return err - } + // TODO : refactor with pipeline + // configAgents, err := loc.ConfigAgents() + // if err != nil { + // return err + // } - configAgentsOrdered := config.Sort(configAgents, config.SortInputsFirst) - for _, configAgent := range configAgentsOrdered { - if _, err := core.NewAgent(configAgent); err != nil { - return err - } - } + // configAgentsOrdered := config.Sort(configAgents, config.SortInputsFirst) + // for _, configAgent := range configAgentsOrdered { + // if _, err := core.NewAgent(configAgent); err != nil { + // return err + // } + // } return nil } diff --git a/cmd/bitfanUI/server/server.go b/cmd/bitfanUI/server/server.go index 7fe947f7..774f97f6 100644 --- a/cmd/bitfanUI/server/server.go +++ b/cmd/bitfanUI/server/server.go @@ -15,7 +15,7 @@ import ( sessions "github.com/tommy351/gin-sessions" "github.com/vjeantet/bitfan/api/client" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" ) var apiClient *client.RestClient diff --git a/codecs/codec.go b/codecs/codec.go index d2246932..e1a7b2a7 100644 --- a/codecs/codec.go +++ b/codecs/codec.go @@ -5,14 +5,14 @@ import ( "io" "github.com/vjeantet/bitfan/codecs/csv" - "github.com/vjeantet/bitfan/codecs/w3c" "github.com/vjeantet/bitfan/codecs/json" "github.com/vjeantet/bitfan/codecs/jsonlines" - "github.com/vjeantet/bitfan/codecs/lib" "github.com/vjeantet/bitfan/codecs/line" "github.com/vjeantet/bitfan/codecs/multiline" "github.com/vjeantet/bitfan/codecs/plain" "github.com/vjeantet/bitfan/codecs/rubydebug" + "github.com/vjeantet/bitfan/codecs/w3c" + "github.com/vjeantet/bitfan/commons" "golang.org/x/text/encoding" "golang.org/x/text/encoding/ianaindex" ) @@ -49,11 +49,11 @@ type Codec struct { Role string Charset string Options map[string]interface{} - logger lib.Logger + logger commons.Logger configWorkingLocation string } -func New(name string, conf map[string]interface{}, logger lib.Logger, cwl string) *Codec { +func New(name string, conf map[string]interface{}, logger commons.Logger, cwl string) *Codec { c := &Codec{ Name: name, Charset: "utf-8", diff --git a/codecs/csv/decoder.go b/codecs/csv/decoder.go index 971d807b..8e0d76e9 100644 --- a/codecs/csv/decoder.go +++ b/codecs/csv/decoder.go @@ -8,7 +8,7 @@ import ( "io" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type decoder struct { @@ -17,7 +17,7 @@ type decoder struct { columnnames []string options decoderOptions comma rune - log lib.Logger + log commons.Logger title bool } @@ -68,7 +68,7 @@ func NewDecoder(r io.Reader) *decoder { return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger if err := mapstructure.Decode(conf, &d.options); err != nil { diff --git a/codecs/csv/decoder_test.go b/codecs/csv/decoder_test.go index d04bfb32..8846e197 100644 --- a/codecs/csv/decoder_test.go +++ b/codecs/csv/decoder_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) func TestDefaultSettings(t *testing.T) { @@ -75,7 +75,7 @@ func TestWithCustomColumns(t *testing.T) { "columns": []string{"user_defined_1", "user_defined_2"}, "comment": "#", } - var l lib.Logger + var l commons.Logger err := d.SetOptions(conf, l, "") assert.NoError(t, err) diff --git a/codecs/decoder.go b/codecs/decoder.go index 56613bd6..0fcd662b 100644 --- a/codecs/decoder.go +++ b/codecs/decoder.go @@ -1,9 +1,9 @@ package codecs -import "github.com/vjeantet/bitfan/codecs/lib" +import "github.com/vjeantet/bitfan/commons" type Decoder interface { Decode(*interface{}) error - SetOptions(map[string]interface{}, lib.Logger, string) error + SetOptions(map[string]interface{}, commons.Logger, string) error More() bool } diff --git a/codecs/encoder.go b/codecs/encoder.go index 7d58c2fd..2d743cc8 100644 --- a/codecs/encoder.go +++ b/codecs/encoder.go @@ -1,8 +1,8 @@ package codecs -import "github.com/vjeantet/bitfan/codecs/lib" +import "github.com/vjeantet/bitfan/commons" type Encoder interface { Encode(map[string]interface{}) error - SetOptions(map[string]interface{}, lib.Logger, string) error + SetOptions(map[string]interface{}, commons.Logger, string) error } diff --git a/codecs/json/decoder.go b/codecs/json/decoder.go index c01a1612..a4abba30 100644 --- a/codecs/json/decoder.go +++ b/codecs/json/decoder.go @@ -6,14 +6,14 @@ import ( "io" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type decoder struct { d *json.Decoder options decoderOptions - log lib.Logger + log commons.Logger } type decoderOptions struct { @@ -33,7 +33,7 @@ func NewDecoder(r io.Reader) *decoder { options: decoderOptions{}, } } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger err := mapstructure.Decode(conf, &d.options) diff --git a/codecs/json/encoder.go b/codecs/json/encoder.go index 97fc053e..d8aa02d3 100644 --- a/codecs/json/encoder.go +++ b/codecs/json/encoder.go @@ -5,7 +5,7 @@ import ( "io" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type encoder struct { @@ -31,7 +31,7 @@ func NewEncoder(w io.Writer) *encoder { return e } -func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (e *encoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { if err := mapstructure.Decode(conf, &e.options); err != nil { return err } diff --git a/codecs/jsonlines/decoder.go b/codecs/jsonlines/decoder.go index d1beeadf..38749a7b 100644 --- a/codecs/jsonlines/decoder.go +++ b/codecs/jsonlines/decoder.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type decoder struct { @@ -16,7 +16,7 @@ type decoder struct { r *bufio.Scanner options decoderOptions - log lib.Logger + log commons.Logger } type decoderOptions struct { @@ -63,7 +63,7 @@ func NewDecoder(r io.Reader) *decoder { return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger return mapstructure.Decode(conf, &d.options) diff --git a/codecs/lib/log_test.go b/codecs/lib/log_test.go deleted file mode 100644 index 7422d577..00000000 --- a/codecs/lib/log_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package lib - -import "testing" - -func TestLog(t *testing.T) { - t.Skip("TODO") -} diff --git a/codecs/line/decoder.go b/codecs/line/decoder.go index aaee2a55..2f5147d2 100644 --- a/codecs/line/decoder.go +++ b/codecs/line/decoder.go @@ -7,7 +7,7 @@ import ( "strings" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) // doc decoder @@ -16,7 +16,7 @@ type decoder struct { r *bufio.Scanner options decoderOptions - log lib.Logger + log commons.Logger } // doc decoderOptions @@ -64,7 +64,7 @@ func NewDecoder(r io.Reader) *decoder { return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger return mapstructure.Decode(conf, &d.options) diff --git a/codecs/line/encoder.go b/codecs/line/encoder.go index 463ceb31..0448e3ba 100644 --- a/codecs/line/encoder.go +++ b/codecs/line/encoder.go @@ -8,8 +8,7 @@ import ( "text/template" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" ) // doc encoder @@ -18,7 +17,7 @@ type encoder struct { options encoderOptions formatTpl *template.Template - log lib.Logger + log commons.Logger } // doc encoderOptions @@ -50,7 +49,7 @@ func NewEncoder(w io.Writer) *encoder { return e } -func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (e *encoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { e.log = logger if err := mapstructure.Decode(conf, &e.options); err != nil { @@ -61,7 +60,7 @@ func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl //TODO : add a location.TemplateWithOptions to return golang text/template - loc, err := location.NewLocation(e.options.Format, cwl) + loc, err := commons.NewLocation(e.options.Format, cwl) if err != nil { return err } diff --git a/codecs/multiline/decoder.go b/codecs/multiline/decoder.go index a779143c..37753a21 100644 --- a/codecs/multiline/decoder.go +++ b/codecs/multiline/decoder.go @@ -72,7 +72,7 @@ import ( "strings" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) // Merges multiline messages into a single event @@ -82,7 +82,7 @@ type decoder struct { options decoderOptions memory string - log lib.Logger + log commons.Logger } // @@ -145,7 +145,7 @@ func NewDecoder(r io.Reader) *decoder { } return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger return mapstructure.Decode(conf, &d.options) diff --git a/codecs/plain/decoder.go b/codecs/plain/decoder.go index c4ada905..44bcd76a 100644 --- a/codecs/plain/decoder.go +++ b/codecs/plain/decoder.go @@ -6,7 +6,7 @@ import ( "io/ioutil" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type decoder struct { @@ -14,7 +14,7 @@ type decoder struct { r io.Reader options decoderOptions - log lib.Logger + log commons.Logger } type decoderOptions struct { @@ -30,7 +30,7 @@ func NewDecoder(r io.Reader) *decoder { return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger return mapstructure.Decode(conf, &d.options) diff --git a/codecs/plain/encoder.go b/codecs/plain/encoder.go index 48d1985f..9b531ee0 100644 --- a/codecs/plain/encoder.go +++ b/codecs/plain/encoder.go @@ -8,8 +8,7 @@ import ( "text/template" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" ) // doc encoder @@ -18,7 +17,7 @@ type encoder struct { options encoderOptions formatTpl *template.Template - log lib.Logger + log commons.Logger } // doc encoderOptions @@ -47,7 +46,7 @@ func NewEncoder(w io.Writer) *encoder { return e } -func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (e *encoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { e.log = logger if err := mapstructure.Decode(conf, &e.options); err != nil { @@ -57,7 +56,7 @@ func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl if e.options.Format != "" { //TODO : add a location.TemplateWithOptions to return golang text/template - loc, err := location.NewLocation(e.options.Format, cwl) + loc, err := commons.NewLocation(e.options.Format, cwl) if err != nil { return err } diff --git a/codecs/rubydebug/encoder.go b/codecs/rubydebug/encoder.go index 10d2a7c9..745aae53 100644 --- a/codecs/rubydebug/encoder.go +++ b/codecs/rubydebug/encoder.go @@ -7,7 +7,7 @@ import ( "github.com/k0kubun/pp" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" "gopkg.in/go-playground/validator.v8" ) @@ -16,7 +16,7 @@ type encoder struct { w io.Writer options encoderOptions - log lib.Logger + log commons.Logger } // Encode options @@ -30,7 +30,7 @@ func NewEncoder(w io.Writer) *encoder { } } -func (e *encoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (e *encoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { e.log = logger if err := mapstructure.Decode(conf, &e.options); err != nil { diff --git a/codecs/w3c/decoder.go b/codecs/w3c/decoder.go index a4cacae6..8df823b2 100644 --- a/codecs/w3c/decoder.go +++ b/codecs/w3c/decoder.go @@ -9,7 +9,7 @@ import ( "strings" "github.com/mitchellh/mapstructure" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) type decoder struct { @@ -17,7 +17,7 @@ type decoder struct { r *csv.Reader columnnames []string options decoderOptions - log lib.Logger + log commons.Logger } // Parses comma-separated value data into individual fields @@ -66,7 +66,7 @@ func NewDecoder(r io.Reader) *decoder { return d } -func (d *decoder) SetOptions(conf map[string]interface{}, logger lib.Logger, cwl string) error { +func (d *decoder) SetOptions(conf map[string]interface{}, logger commons.Logger, cwl string) error { d.log = logger if err := mapstructure.Decode(conf, &d.options); err != nil { diff --git a/codecs/w3c/decoder_test.go b/codecs/w3c/decoder_test.go index b7ac907e..e70dc145 100644 --- a/codecs/w3c/decoder_test.go +++ b/codecs/w3c/decoder_test.go @@ -5,7 +5,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/vjeantet/bitfan/codecs/lib" + "github.com/vjeantet/bitfan/commons" ) func TestDefaultSettings(t *testing.T) { @@ -51,7 +51,7 @@ func TestDefaultSettings(t *testing.T) { } d := NewDecoder(strings.NewReader(data)) - var l lib.Logger + var l commons.Logger err := d.SetOptions(map[string]interface{}{}, l, "") assert.NoError(t, err) @@ -122,7 +122,7 @@ func TestTabSeparatedColumns(t *testing.T) { } d := NewDecoder(strings.NewReader(data)) - var l lib.Logger + var l commons.Logger err := d.SetOptions(conf, l, "") assert.NoError(t, err) @@ -173,7 +173,7 @@ func TestWithCustomColumns(t *testing.T) { d := NewDecoder(strings.NewReader(data)) - var l lib.Logger + var l commons.Logger err := d.SetOptions(conf, l, "") assert.NoError(t, err) diff --git a/core/location/location.go b/commons/location.go similarity index 99% rename from core/location/location.go rename to commons/location.go index e0b8e25c..5deeadce 100644 --- a/core/location/location.go +++ b/commons/location.go @@ -1,4 +1,4 @@ -package location +package commons import ( "fmt" diff --git a/core/location/location_test.go b/commons/location_test.go similarity index 80% rename from core/location/location_test.go rename to commons/location_test.go index 9d2d89ad..cd5dfd80 100644 --- a/core/location/location_test.go +++ b/commons/location_test.go @@ -1,4 +1,4 @@ -package location +package commons import "testing" diff --git a/codecs/lib/log.go b/commons/log.go similarity index 98% rename from codecs/lib/log.go rename to commons/log.go index 6b855432..2f4e99b9 100644 --- a/codecs/lib/log.go +++ b/commons/log.go @@ -1,4 +1,4 @@ -package lib +package commons type Logger interface { Debug(args ...interface{}) diff --git a/core/location/templatefuncs.go b/commons/templatefuncs.go similarity index 99% rename from core/location/templatefuncs.go rename to commons/templatefuncs.go index 3be667ad..2ed02988 100644 --- a/core/location/templatefuncs.go +++ b/commons/templatefuncs.go @@ -1,4 +1,4 @@ -package location +package commons // Code comes from https://github.com/spf13/hugo/tree/master/tpl diff --git a/core/agent.go b/core/agent.go index 7f837617..6fbdfbf9 100644 --- a/core/agent.go +++ b/core/agent.go @@ -6,11 +6,14 @@ import ( "path/filepath" "sync" - "github.com/vjeantet/bitfan/core/config" + "github.com/vjeantet/bitfan/core/metrics" + "github.com/vjeantet/bitfan/core/webhook" "github.com/vjeantet/bitfan/processors" ) -type agent struct { +type ProcessorFactory func() processors.Processor + +type Agent struct { ID int Label string processor processors.Processor @@ -18,53 +21,68 @@ type agent struct { outputs map[int][]chan *event Done chan bool concurentProcess int - conf config.Agent -} - -func NewAgent(conf config.Agent) (*agent, error) { - return newAgent(conf) + // conf config.Agent + + Sources []string `json:"sources"` + AgentSources PortList + AgentRecipients PortList + Type string `json:"type"` + Schedule string `json:"schedule"` + Trace bool `json:"trace"` + PoolSize int `json:"pool_size"` + PipelineName string + PipelineUUID string + Buffer int `json:"buffer_size"` + Options map[string]interface{} + Wd string +} + +var agentIndex int = 0 + +func NewAgent() Agent { + agentIndex++ + return Agent{ + ID: agentIndex, + } } // build an agent and return its input chan -func newAgent(conf config.Agent) (*agent, error) { +func buildAgent(conf *Agent) error { // Check that the agent's processor type is supported if _, ok := availableProcessorsFactory[conf.Type]; !ok { - return nil, fmt.Errorf("Processor %s not found", conf.Type) + return fmt.Errorf("Processor %s not found", conf.Type) } // Create a new Processor processor proc := availableProcessorsFactory[conf.Type]() if proc == nil { - return nil, fmt.Errorf("Can not start processor %s", conf.Type) + return fmt.Errorf("Can not start processor %s", conf.Type) } - a := &agent{ - packetChan: make(chan *event, conf.Buffer), - outputs: map[int][]chan *event{}, - processor: proc, - Done: make(chan bool), - conf: conf, - } + conf.packetChan = make(chan *event, conf.Buffer) + conf.outputs = map[int][]chan *event{} + conf.processor = proc + conf.Done = make(chan bool) + conf.Options = conf.Options // Configure the agent (and its processor) - if err := a.configure(&conf); err != nil { - return nil, fmt.Errorf("Can not configure agent %s : %v", conf.Type, err) + if err := conf.configure(); err != nil { + return fmt.Errorf("Can not configure agent %s : %v", conf.Type, err) } - return a, nil + return nil } -func (a *agent) configure(conf *config.Agent) error { - a.ID = conf.ID - a.Label = conf.Label - a.processor.SetPipelineUUID(a.conf.PipelineUUID) +func (a *Agent) configure() error { + + a.processor.SetPipelineUUID(a.PipelineUUID) ctx := processorContext{} ctx.logger = NewLogger("pipeline", map[string]interface{}{ - "processor_type": conf.Type, - "pipeline_uuid": conf.PipelineUUID, - "processor_label": conf.Label, + "processor_type": a.Type, + "pipeline_uuid": a.PipelineUUID, + "processor_label": a.Label, }, ) @@ -72,13 +90,13 @@ func (a *agent) configure(conf *config.Agent) error { // data["pipeline_uuid"] = pipelineUUID // data["processor_label"] = proc_label ctx.packetBuilder = newPacket - ctx.dataLocation = filepath.Join(dataLocation, conf.Type) - ctx.configWorkingLocation = conf.Wd - ctx.memory = myMemory.Space(conf.Type) - ctx.webHook = newWebHook(conf.PipelineName, conf.Label) + ctx.dataLocation = filepath.Join(dataLocation, a.Type) + ctx.configWorkingLocation = a.Wd + ctx.memory = myMemory.Space(a.Type) + ctx.webHook = webhook.New(a.PipelineName, a.Label) var err error - ctx.store, err = Storage().NewProcessorStorage(conf.Type) + ctx.store, err = Storage().NewProcessorStorage(a.Type) if err != nil { Log().Errorf("Storage error : %s", err.Error()) } @@ -90,32 +108,32 @@ func (a *agent) configure(conf *config.Agent) error { } } - return a.processor.Configure(ctx, conf.Options) + return a.processor.Configure(ctx, a.Options) } -func (a *agent) traceEvent(way string, packet processors.IPacket, portNumbers ...int) { +func (a *Agent) traceEvent(way string, packet processors.IPacket, portNumbers ...int) { verb := "received" if way == "OUT" { verb = "sent" } Log().e.WithFields( map[string]interface{}{ - "processor_type": a.conf.Type, - "pipeline_uuid": a.conf.PipelineUUID, - "processor_label": a.conf.Label, + "processor_type": a.Type, + "pipeline_uuid": a.PipelineUUID, + "processor_label": a.Label, "event": packet.Fields().Old(), "ports": portNumbers, "trace": way, }, - ).Info(verb + " event by " + a.conf.Label + " on pipeline '" + a.conf.PipelineName + "'") + ).Info(verb + " event by " + a.Label + " on pipeline '" + a.PipelineName + "'") } -func (a *agent) send(packet processors.IPacket, portNumbers ...int) bool { +func (a *Agent) send(packet processors.IPacket, portNumbers ...int) bool { if len(portNumbers) == 0 { portNumbers = []int{0} } - if a.conf.Trace { + if a.Trace { a.traceEvent("OUT", packet, portNumbers...) } @@ -124,71 +142,32 @@ func (a *agent) send(packet processors.IPacket, portNumbers ...int) bool { for _, portNumber := range portNumbers { if len(a.outputs[portNumber]) == 1 { a.outputs[portNumber][0] <- packet.(*event) - metrics.increment(METRIC_PROC_OUT, a.conf.PipelineName, a.Label) + myMetrics.Increment(metrics.PROC_OUT, a.PipelineName, a.Label) } else { // do not use go routine nor waitgroup as it slow down the processing for _, out := range a.outputs[portNumber] { // Clone() is a time killer // TODO : failback if out does not take out packet on x ms (share on a bitfanSlave) out <- packet.Clone().(*event) - metrics.increment(METRIC_PROC_OUT, a.conf.PipelineName, a.Label) + myMetrics.Increment(metrics.PROC_OUT, a.PipelineName, a.Label) } } } return true } -type processorContext struct { - packetSender processors.PacketSender - packetBuilder processors.PacketBuilder - logger processors.Logger - memory processors.Memory - webHook processors.WebHook - store processors.IStore - dataLocation string - configWorkingLocation string -} - -func (p processorContext) Log() processors.Logger { - return p.logger -} -func (p processorContext) Memory() processors.Memory { - return p.memory -} - -func (p processorContext) WebHook() processors.WebHook { - return p.webHook -} -func (p processorContext) PacketSender() processors.PacketSender { - return p.packetSender -} -func (p processorContext) PacketBuilder() processors.PacketBuilder { - return p.packetBuilder -} -func (p processorContext) ConfigWorkingLocation() string { - return p.configWorkingLocation -} - -func (p processorContext) DataLocation() string { - return p.dataLocation -} - -func (p processorContext) Store() processors.IStore { - return p.store -} - -func (a *agent) addOutput(in chan *event, portNumber int) error { +func (a *Agent) addOutput(in chan *event, portNumber int) error { a.outputs[portNumber] = append(a.outputs[portNumber], in) return nil } // Start agent -func (a *agent) start() error { +func (a *Agent) start() error { // Start processor a.processor.Start(newPacket("start", map[string]interface{}{})) // Maximum number of concurent packet consumption ? - var maxConcurentPackets = a.conf.PoolSize + var maxConcurentPackets = a.PoolSize if a.processor.MaxConcurent() > 0 && maxConcurentPackets > a.processor.MaxConcurent() { maxConcurentPackets = a.processor.MaxConcurent() @@ -208,22 +187,22 @@ func (a *agent) start() error { Log().Debugf("processor (%d) - stopping (no more packets)", a.ID) if err := a.processor.Stop(newPacket("", nil)); err != nil { - Log().Errorf("%s %d : %v", a.conf.Type, a.ID, err) + Log().Errorf("%s %d : %v", a.Type, a.ID, err) } close(a.Done) Log().Debugf("processor (%d) - stopped", a.ID) }(maxConcurentPackets) // Register scheduler if needed - if a.conf.Schedule != "" { - Log().Debugf("agent %s : schedule=%s", a.Label, a.conf.Schedule) - err := myScheduler.Add(a.Label, a.conf.Schedule, func() { + if a.Schedule != "" { + Log().Debugf("agent %s : schedule=%s", a.Label, a.Schedule) + err := myScheduler.Add(a.Label, a.Schedule, func() { go a.processor.Tick(newPacket("", nil)) }) if err != nil { Log().Errorf("schedule start failed - %s : %v", a.Label, err) } else { - Log().Debugf("agent %s(%s) scheduled with %s", a.Label, a.ID, a.conf.Schedule) + Log().Debugf("agent %s(%s) scheduled with %s", a.Label, a.ID, a.Schedule) } } @@ -231,25 +210,25 @@ func (a *agent) start() error { } // listen plugs the agent processor to its event chan -func (a *agent) listen(wg *sync.WaitGroup) { +func (a *Agent) listen(wg *sync.WaitGroup) { Log().Debugf("Starting EventLoop on %d-%s", a.ID, a.Label) for e := range a.packetChan { // Receive a work request. - metrics.set(METRIC_CONNECTION_TRANSIT, a.conf.PipelineName, a.Label, len(a.packetChan)) + myMetrics.Set(metrics.CONNECTION_TRANSIT, a.PipelineName, a.Label, len(a.packetChan)) - if a.conf.Trace { + if a.Trace { a.traceEvent("IN", e, 0) } if err := a.processor.Receive(e); err != nil { - Log().Errorf("agent %s: %v", a.conf.Type, err) + Log().Errorf("agent %s: %v", a.Type, err) } - metrics.increment(METRIC_PROC_IN, a.conf.PipelineName, a.Label) + myMetrics.Increment(metrics.PROC_IN, a.PipelineName, a.Label) } wg.Done() } -func (a *agent) stop() { +func (a *Agent) stop() { myScheduler.Remove(a.Label) Log().Debugf("agent %d schedule job removed", a.ID) @@ -266,10 +245,10 @@ func (a *agent) stop() { Log().Debugf("Processor %s stopped", a.Label) } -func (a *agent) pause() { +func (a *Agent) pause() { } -func (a *agent) resume() { +func (a *Agent) resume() { } diff --git a/core/config/codec.go b/core/codec.go similarity index 59% rename from core/config/codec.go rename to core/codec.go index 98b4eb2f..c3a3e2e8 100644 --- a/core/config/codec.go +++ b/core/codec.go @@ -1,4 +1,4 @@ -package config +package core type Codec struct { Name string @@ -17,3 +17,15 @@ func NewCodec(name string) *Codec { func (c *Codec) String() string { return c.Name } + +func (c *Codec) GetName() string { + return c.Name +} + +func (c *Codec) GetRole() string { + return c.Role +} + +func (c *Codec) GetOptions() map[string]interface{} { + return c.Options +} diff --git a/core/config/.gitignore b/core/config/.gitignore deleted file mode 100644 index daf913b1..00000000 --- a/core/config/.gitignore +++ /dev/null @@ -1,24 +0,0 @@ -# Compiled Object files, Static and Dynamic libs (Shared Objects) -*.o -*.a -*.so - -# Folders -_obj -_test - -# Architecture specific extensions/prefixes -*.[568vq] -[568vq].out - -*.cgo1.go -*.cgo2.c -_cgo_defun.c -_cgo_gotypes.go -_cgo_export.* - -_testmain.go - -*.exe -*.test -*.prof diff --git a/core/config/LICENSE b/core/config/LICENSE deleted file mode 100644 index 8dada3ed..00000000 --- a/core/config/LICENSE +++ /dev/null @@ -1,201 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "{}" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright {yyyy} {name of copyright owner} - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. diff --git a/core/config/agent_test.go b/core/config/agent_test.go deleted file mode 100644 index 1eddc013..00000000 --- a/core/config/agent_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package config - -import "testing" - -func TestConfig(t *testing.T) { - t.Skip("TODO") -} diff --git a/core/config/pipeline.go b/core/config/pipeline.go deleted file mode 100644 index 25c7e1cd..00000000 --- a/core/config/pipeline.go +++ /dev/null @@ -1,33 +0,0 @@ -package config - -import ( - "time" - - fqdn "github.com/ShowMax/go-fqdn" - uuid "github.com/nu7hatch/gouuid" -) - -type PipelineState int - -type Pipeline struct { - Uuid string - Name string - Description string - ConfigLocation string - ConfigHostLocation string - - StartedAt time.Time - StoppedAt time.Time -} - -func NewPipeline(name, description, configLocation string) *Pipeline { - uid, _ := uuid.NewV4() - - return &Pipeline{ - Name: name, - Uuid: uid.String(), - Description: description, - ConfigLocation: configLocation, - ConfigHostLocation: fqdn.Get(), - } -} diff --git a/core/core.go b/core/core.go index 4a522143..27078129 100644 --- a/core/core.go +++ b/core/core.go @@ -4,23 +4,21 @@ import ( "fmt" "net/http" "os" - "strings" "golang.org/x/sync/syncmap" - fqdn "github.com/ShowMax/go-fqdn" "github.com/spf13/viper" - "github.com/justinas/alice" - "github.com/prometheus/client_golang/prometheus" - "github.com/vjeantet/bitfan/core/config" - "github.com/vjeantet/bitfan/core/store" + "github.com/vjeantet/bitfan/core/memory" + "github.com/vjeantet/bitfan/core/metrics" + "github.com/vjeantet/bitfan/core/webhook" + "github.com/vjeantet/bitfan/store" ) var ( - metrics Metrics + myMetrics metrics.Metrics myScheduler *scheduler - myMemory *memory + myMemory *memory.Memory myStore *store.Store availableProcessorsFactory map[string]ProcessorFactory = map[string]ProcessorFactory{} @@ -31,12 +29,22 @@ var ( type fnMux func(sm *http.ServeMux) +type Options struct { + Host string + HttpHandlers []fnMux + Debug bool + VerboseLog bool + LogFile string + DataLocation string + Prometheus string +} + func init() { - metrics = &MetricsVoid{} + myMetrics = metrics.New() myScheduler = newScheduler() myScheduler.Start() //Init Store - myMemory = newMemory(dataLocation) + myMemory = memory.New() } // RegisterProcessor is called by the processor loader when the program starts @@ -46,10 +54,6 @@ func RegisterProcessor(name string, procFact ProcessorFactory) { availableProcessorsFactory[name] = procFact } -func setMetrics(s Metrics) { - metrics = s -} - func setDataLocation(location string) error { dataLocation = location fileInfo, err := os.Stat(dataLocation) @@ -78,18 +82,6 @@ func setDataLocation(location string) error { return err } -func webHookServer() fnMux { - whPrefixURL = "/" - commonHandlers := alice.New(loggingHandler, recoverHandler) - return HTTPHandler("/", commonHandlers.ThenFunc(routerHandler)) -} - -// TODO : should be unexported -func PrometheusServer(path string) fnMux { - setMetrics(NewPrometheus()) - return HTTPHandler(path, prometheus.Handler()) -} - // TODO : should be unexported func Storage() *store.Store { return myStore @@ -107,14 +99,7 @@ func listenAndServe(addr string, hs ...fnMux) { h(httpServerMux) } go http.ListenAndServe(addr, httpServerMux) - - addrSpit := strings.Split(addr, ":") - if addrSpit[0] == "0.0.0.0" { - addrSpit[0] = fqdn.Get() - } - - baseURL = fmt.Sprintf("http://%s:%s", addrSpit[0], addrSpit[1]) - Log().Infof("Ready to serve on %s", baseURL) + Log().Infof("Ready to serve on %s", addr) } func Start(opt Options) { @@ -135,8 +120,15 @@ func Start(opt Options) { panic(err.Error()) } + if opt.Prometheus != "" { + m := metrics.NewPrometheus(opt.Prometheus) + opt.HttpHandlers = append(opt.HttpHandlers, HTTPHandler(m.Path, m.HTTPHandler())) + myMetrics = m + } + if len(opt.HttpHandlers) > 0 { - opt.HttpHandlers = append(opt.HttpHandlers, webHookServer()) + webhook.Log = logger + opt.HttpHandlers = append(opt.HttpHandlers, HTTPHandler("/", webhook.Handler(opt.Host))) listenAndServe(opt.Host, opt.HttpHandlers...) } @@ -144,24 +136,6 @@ func Start(opt Options) { Log().Debugln("bitfan started") } -// StartPipeline load all agents form a configPipeline and returns pipeline's ID -func StartPipeline(configPipeline *config.Pipeline, configAgents []config.Agent) (string, error) { - p, err := newPipeline(configPipeline, configAgents) - if err != nil { - return "", err - } - if _, ok := pipelines.Load(p.Uuid); ok { - // a pipeline with same uuid is already running - return "", fmt.Errorf("a pipeline with uuid %s is already running", p.Uuid) - } - - pipelines.Store(p.Uuid, p) - - err = p.start() - - return p.Uuid, err -} - func StopPipeline(Uuid string) error { var err error if p, ok := pipelines.Load(Uuid); ok { @@ -187,13 +161,18 @@ func Stop() error { }) for _, Uuid := range Uuids { - err := StopPipeline(Uuid) + p, ok := GetPipeline(Uuid) + if !ok { + Log().Error("Stop Pipeline - pipeline " + Uuid + " not found") + continue + } + err := p.Stop() if err != nil { Log().Error(err) } } - myMemory.close() + myMemory.Close() myStore.Close() return nil } diff --git a/core/config/graph.go b/core/graph.go similarity index 98% rename from core/config/graph.go rename to core/graph.go index 069506ce..3ebdd0d3 100644 --- a/core/config/graph.go +++ b/core/graph.go @@ -1,4 +1,4 @@ -package config +package core // Sorting order const ( diff --git a/core/memory.go b/core/memory/memory.go similarity index 52% rename from core/memory.go rename to core/memory/memory.go index 5fcb349e..683f6dc9 100644 --- a/core/memory.go +++ b/core/memory/memory.go @@ -1,49 +1,49 @@ -package core +package memory import cache "github.com/patrickmn/go-cache" -type memory struct { +type Memory struct { } -type memorySpace struct { +type MemorySpace struct { name string c *cache.Cache } -func newMemory(location string) *memory { - return &memory{} +func New() *Memory { + return &Memory{} } -func (s *memory) close() { - // persist memory on disk ? +func (s *Memory) Close() { + // persist Memory ? } -func (s *memory) Space(name string) *memorySpace { - return &memorySpace{ +func (s *Memory) Space(name string) *MemorySpace { + return &MemorySpace{ name: name, c: cache.New(cache.NoExpiration, 0), } } // Set add an item to the cache, replacing any existing item with the same name -func (m *memorySpace) Set(name string, value interface{}) { +func (m *MemorySpace) Set(name string, value interface{}) { m.c.Set(name, value, cache.NoExpiration) } // Get an item from the cache. Returns the item or nil, and a bool indicating whether the key was found. -func (m *memorySpace) Get(name string) (interface{}, bool) { +func (m *MemorySpace) Get(name string) (interface{}, bool) { return m.c.Get(name) } -func (m *memorySpace) Delete(name string) { +func (m *MemorySpace) Delete(name string) { m.c.Delete(name) } -func (m *memorySpace) IncrementInt(k string, n int) { +func (m *MemorySpace) IncrementInt(k string, n int) { m.c.IncrementInt(k, n) } -func (m *memorySpace) Items() map[string]interface{} { +func (m *MemorySpace) Items() map[string]interface{} { values := m.c.Items() r := map[string]interface{}{} for key, value := range values { diff --git a/core/metrics.go b/core/metrics.go deleted file mode 100644 index 9cd483fe..00000000 --- a/core/metrics.go +++ /dev/null @@ -1,25 +0,0 @@ -package core - -// IStats interface to any metric collector -type Metrics interface { - increment(int, string, string) error - decrement(int, string, string) error - set(int, string, string, int) error -} - -const ( - METRIC_PROC_IN = iota + 1 - METRIC_PROC_OUT - METRIC_PACKET_DROP - METRIC_CONNECTION_TRANSIT -) - -type MetricsVoid struct{} - -func (o *MetricsVoid) decrement(metric int, pipelineNamestring string, name string) error { - return nil -} -func (o *MetricsVoid) increment(metric int, pipelineNamestring string, name string) error { - return nil -} -func (o *MetricsVoid) set(metric int, pipelineNamestring string, name string, v int) error { return nil } diff --git a/core/metrics/metrics.go b/core/metrics/metrics.go new file mode 100644 index 00000000..f2caf721 --- /dev/null +++ b/core/metrics/metrics.go @@ -0,0 +1,29 @@ +package metrics + +// IStats interface to any metric collector +type Metrics interface { + Increment(int, string, string) error + Decrement(int, string, string) error + Set(int, string, string, int) error +} + +const ( + PROC_IN = iota + 1 + PROC_OUT + PACKET_DROP + CONNECTION_TRANSIT +) + +func New() *MetricsVoid { + return &MetricsVoid{} +} + +type MetricsVoid struct{} + +func (o *MetricsVoid) Decrement(metric int, pipelineNamestring string, name string) error { + return nil +} +func (o *MetricsVoid) Increment(metric int, pipelineNamestring string, name string) error { + return nil +} +func (o *MetricsVoid) Set(metric int, pipelineNamestring string, name string, v int) error { return nil } diff --git a/core/metrics_prometheus.go b/core/metrics/metrics_prometheus.go similarity index 75% rename from core/metrics_prometheus.go rename to core/metrics/metrics_prometheus.go index 22891d52..7be38389 100644 --- a/core/metrics_prometheus.go +++ b/core/metrics/metrics_prometheus.go @@ -1,6 +1,7 @@ -package core +package metrics import ( + "net/http" "runtime" "github.com/prometheus/client_golang/prometheus" @@ -13,10 +14,12 @@ type metricsPrometheus struct { agent_packet_out *prometheus.CounterVec connection_packet_transit *prometheus.GaugeVec goroutines prometheus.GaugeFunc + Path string } -func NewPrometheus() Metrics { +func NewPrometheus(path string) *metricsPrometheus { stats := &metricsPrometheus{ + Path: path, goroutines: prometheus.NewGaugeFunc( prometheus.GaugeOpts{ Namespace: namespace, @@ -29,20 +32,20 @@ func NewPrometheus() Metrics { agent_packet_in: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, - Subsystem: "agent", + Subsystem: "Agent", Name: "packet_consumption", Help: "packets consumed by processors", }, - []string{"pipeline", "agent"}, + []string{"pipeline", "Agent"}, ), agent_packet_out: prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: namespace, - Subsystem: "agent", + Subsystem: "Agent", Name: "packet_production", Help: "packets produced by processors", }, - []string{"pipeline", "agent"}, + []string{"pipeline", "Agent"}, ), connection_packet_transit: prometheus.NewGaugeVec(prometheus.GaugeOpts{ @@ -51,7 +54,7 @@ func NewPrometheus() Metrics { Name: "transit", Help: "packets in transit to processors", }, - []string{"pipeline", "agent"}, + []string{"pipeline", "Agent"}, ), } @@ -63,30 +66,34 @@ func NewPrometheus() Metrics { return stats } -func (s *metricsPrometheus) set(metric int, pipelineName string, name string, v int) error { +func (m *metricsPrometheus) HTTPHandler() http.Handler { + return prometheus.Handler() +} + +func (s *metricsPrometheus) Set(metric int, pipelineName string, name string, v int) error { switch metric { - case METRIC_CONNECTION_TRANSIT: + case CONNECTION_TRANSIT: s.connection_packet_transit.WithLabelValues(pipelineName, name).Set(float64(v)) } return nil } -func (s *metricsPrometheus) increment(metric int, pipelineName string, name string) error { +func (s *metricsPrometheus) Increment(metric int, pipelineName string, name string) error { switch metric { - case METRIC_PROC_OUT: + case PROC_OUT: s.agent_packet_out.WithLabelValues(pipelineName, name).Inc() - case METRIC_PROC_IN: + case PROC_IN: s.agent_packet_in.WithLabelValues(pipelineName, name).Inc() - case METRIC_CONNECTION_TRANSIT: + case CONNECTION_TRANSIT: s.connection_packet_transit.WithLabelValues(pipelineName, name).Inc() } return nil } -func (s *metricsPrometheus) decrement(metric int, pipelineName string, name string) error { +func (s *metricsPrometheus) Decrement(metric int, pipelineName string, name string) error { s.connection_packet_transit.WithLabelValues(pipelineName, name).Dec() return nil } diff --git a/core/options.go b/core/options.go deleted file mode 100644 index 7ce68bdd..00000000 --- a/core/options.go +++ /dev/null @@ -1,10 +0,0 @@ -package core - -type Options struct { - Host string - HttpHandlers []fnMux - Debug bool - VerboseLog bool - LogFile string - DataLocation string -} diff --git a/core/pipeline.go b/core/pipeline.go index be3619d2..856d7df1 100644 --- a/core/pipeline.go +++ b/core/pipeline.go @@ -1,41 +1,67 @@ package core import ( + "fmt" "time" - "github.com/vjeantet/bitfan/core/config" + fqdn "github.com/ShowMax/go-fqdn" + uuid "github.com/nu7hatch/gouuid" ) type Pipeline struct { Uuid string Label string - agents map[int]*agent + agents map[int]*Agent ConfigLocation string ConfigHostLocation string StartedAt time.Time + + Description string } -func newPipeline(conf *config.Pipeline, configAgents []config.Agent) (*Pipeline, error) { - p := &Pipeline{ - Uuid: conf.Uuid, - Label: conf.Name, - ConfigLocation: conf.ConfigLocation, - ConfigHostLocation: conf.ConfigHostLocation, - agents: map[int]*agent{}, +func NewPipeline() *Pipeline { + uid, _ := uuid.NewV4() + + return &Pipeline{ + Uuid: uid.String(), + Label: uid.String(), + Description: "", + ConfigLocation: "", + ConfigHostLocation: fqdn.Get(), + agents: map[int]*Agent{}, } +} + +func (p *Pipeline) AddAgent(a Agent) error { + a.PipelineName = p.Label + a.PipelineUUID = p.Uuid + p.agents[a.ID] = &a + return nil +} + +// Start all agents, begin with last +func (p *Pipeline) Start() (string, error) { + + if _, ok := pipelines.Load(p.Uuid); ok { + // a pipeline with same uuid is already running + return "", fmt.Errorf("a pipeline with uuid %s is already running", p.Uuid) + } + + pipelines.Store(p.Uuid, p) //normalize - configAgents = config.Normalize(configAgents) + for i, _ := range p.agents { + p.agents[i].AgentRecipients = whoWaitForThisAgentID(p.agents[i].ID, p.agents) + } - // for each agents in configAgents (outputs first) - orderedAgentConfList := config.Sort(configAgents, config.SortInputsFirst) + orderedAgentConfList := Sort(p.agents, SortInputsFirst) for _, agentConf := range orderedAgentConfList { agentConf.PipelineUUID = p.Uuid agentConf.PipelineName = p.Label - a, err := newAgent(agentConf) + err := buildAgent(agentConf) if err != nil { - Log().Errorf("%s agent '%-d' can not start", agentConf.Type, agentConf.ID) - return nil, err + Log().Errorf("%s Agent '%-d' can not start", agentConf.Type, agentConf.ID) + return "", err } // register input chan for futur reference and connecting @@ -44,44 +70,25 @@ func newPipeline(conf *config.Pipeline, configAgents []config.Agent) (*Pipeline, // find agent source.ID aSource aSource := p.agents[sourcePort.AgentID] // add a(in) to aSource outputs with port - aSource.addOutput(a.packetChan, sourcePort.PortNumber) + aSource.addOutput(agentConf.packetChan, sourcePort.PortNumber) } - p.addAgent(a) } - return p, nil -} - -func (p *Pipeline) addAgent(a *agent) error { - a.conf.PipelineName = p.Label - a.conf.PipelineUUID = p.Uuid - p.agents[a.ID] = a - - return nil -} - -// Start all agents, begin with last -func (p *Pipeline) start() error { - orderedAgentConfList := config.Sort(p.agentsConfiguration(), config.SortOutputsFirst) + orderedAgentConfList = Sort(p.agents, SortOutputsFirst) for _, agentConf := range orderedAgentConfList { Log().Debugf("start %d - %s", agentConf.ID, p.agents[agentConf.ID].Label) p.agents[agentConf.ID].start() } p.StartedAt = time.Now() - return nil + return p.Uuid, nil } -func (p *Pipeline) agentsConfiguration() []config.Agent { - agentsConf := []config.Agent{} - for _, a := range p.agents { - agentsConf = append(agentsConf, a.conf) - } - return agentsConf +func (p *Pipeline) Stop() error { + return StopPipeline(p.Uuid) } -// Stop all agents, begin with first func (p *Pipeline) stop() error { - orderedAgentConfList := config.Sort(p.agentsConfiguration(), config.SortInputsFirst) + orderedAgentConfList := Sort(p.agents, SortInputsFirst) for _, agentConf := range orderedAgentConfList { Log().Debugf("stop %d - %s", agentConf.ID, p.agents[agentConf.ID].Label) p.agents[agentConf.ID].stop() diff --git a/core/config/port.go b/core/port.go similarity index 96% rename from core/config/port.go rename to core/port.go index 3d576fbb..b98e977e 100644 --- a/core/config/port.go +++ b/core/port.go @@ -1,4 +1,4 @@ -package config +package core import "fmt" diff --git a/core/processor.go b/core/processor.go deleted file mode 100644 index f22a2237..00000000 --- a/core/processor.go +++ /dev/null @@ -1,5 +0,0 @@ -package core - -import "github.com/vjeantet/bitfan/processors" - -type ProcessorFactory func() processors.Processor diff --git a/core/processor_context.go b/core/processor_context.go new file mode 100644 index 00000000..6c8d8456 --- /dev/null +++ b/core/processor_context.go @@ -0,0 +1,42 @@ +package core + +import "github.com/vjeantet/bitfan/processors" + +type processorContext struct { + packetSender processors.PacketSender + packetBuilder processors.PacketBuilder + logger processors.Logger + memory processors.Memory + webHook processors.WebHook + store processors.IStore + dataLocation string + configWorkingLocation string +} + +func (p processorContext) Log() processors.Logger { + return p.logger +} +func (p processorContext) Memory() processors.Memory { + return p.memory +} + +func (p processorContext) WebHook() processors.WebHook { + return p.webHook +} +func (p processorContext) PacketSender() processors.PacketSender { + return p.packetSender +} +func (p processorContext) PacketBuilder() processors.PacketBuilder { + return p.packetBuilder +} +func (p processorContext) ConfigWorkingLocation() string { + return p.configWorkingLocation +} + +func (p processorContext) DataLocation() string { + return p.dataLocation +} + +func (p processorContext) Store() processors.IStore { + return p.store +} diff --git a/core/store/log.go b/core/store/log.go deleted file mode 100644 index 84181775..00000000 --- a/core/store/log.go +++ /dev/null @@ -1,34 +0,0 @@ -package store - -type Logger interface { - Debug(args ...interface{}) - Debugf(format string, args ...interface{}) - Debugln(args ...interface{}) - - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Errorln(args ...interface{}) - - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) - Fatalln(args ...interface{}) - - Info(args ...interface{}) - Infof(format string, args ...interface{}) - Infoln(args ...interface{}) - - Panic(args ...interface{}) - Panicf(format string, args ...interface{}) - Panicln(args ...interface{}) - - Print(args ...interface{}) - Printf(format string, args ...interface{}) - Println(args ...interface{}) - - Warn(args ...interface{}) - Warnf(format string, args ...interface{}) - Warning(args ...interface{}) - Warningf(format string, args ...interface{}) - Warningln(args ...interface{}) - Warnln(args ...interface{}) -} diff --git a/core/config/agent.go b/core/util.go similarity index 59% rename from core/config/agent.go rename to core/util.go index 6639daa9..a6ec0f61 100644 --- a/core/config/agent.go +++ b/core/util.go @@ -1,30 +1,4 @@ -package config - -type Agent struct { - ID int `json:"id"` - Label string `json:"label"` - Sources []string `json:"sources"` - PipelineName string - PipelineUUID string - AgentSources PortList - AgentRecipients PortList - Type string `json:"type"` - Schedule string `json:"schedule"` - Trace bool `json:"trace"` - PoolSize int `json:"pool_size"` - Buffer int `json:"buffer_size"` - Options map[string]interface{} `json:"options"` - Wd string -} - -var agentIndex int = 0 - -func NewAgent() Agent { - agentIndex++ - return Agent{ - ID: agentIndex, - } -} +package core // Sort will return a sorted list of config.Agent, // it sorts agents by computing links dependencies between them @@ -32,8 +6,8 @@ func NewAgent() Agent { // use sortOrder param config.SortInputsFirst to get agents which are not waiting events (no sources) firstly (like inputs) // // use sortOrder param config.SortOutputsFirst to get agents which are not sources of any other agents firstly (like outputs) -func Sort(agentConflist []Agent, sortOrder int) []Agent { - sac := []Agent{} +func Sort(agentConflist map[int]*Agent, sortOrder int) []*Agent { + sac := []*Agent{} // sac = append(sac, agentConflist...) var agentsDependencyGraph = graph{} @@ -65,15 +39,8 @@ func Sort(agentConflist []Agent, sortOrder int) []Agent { return sac } -func Normalize(agentConf []Agent) []Agent { - for k := range agentConf { - agentConf[k].AgentRecipients = whoWaitForThisAgentID(agentConf[k].ID, agentConf) - } - return agentConf -} - // WhoWaitForThisAgentName returns agents recipients as portList -func whoWaitForThisAgentID(ID int, agentConfigurations []Agent) PortList { +func whoWaitForThisAgentID(ID int, agentConfigurations map[int]*Agent) PortList { var recipentAgents = PortList{} for _, agentConfiguration := range agentConfigurations { diff --git a/core/webhook.go b/core/webhook/webhook.go similarity index 65% rename from core/webhook.go rename to core/webhook/webhook.go index 87e594d8..108c689f 100644 --- a/core/webhook.go +++ b/core/webhook/webhook.go @@ -1,4 +1,4 @@ -package core +package webhook import ( "fmt" @@ -7,7 +7,10 @@ import ( "golang.org/x/sync/syncmap" + fqdn "github.com/ShowMax/go-fqdn" "github.com/gosimple/slug" + "github.com/justinas/alice" + "github.com/vjeantet/bitfan/commons" ) type webHook struct { @@ -18,9 +21,10 @@ type webHook struct { var webHookMap = syncmap.Map{} var baseURL = "" -var whPrefixURL = "" +var whPrefixURL = "/" +var Log commons.Logger -func newWebHook(pipelineLabel, nameSpace string) *webHook { +func New(pipelineLabel, nameSpace string) *webHook { return &webHook{pipelineLabel: pipelineLabel, namespace: nameSpace, Hooks: []string{}} } @@ -33,14 +37,14 @@ func (w *webHook) Add(hookName string, hf http.HandlerFunc) { hUrl := w.buildURL(hookName) w.Hooks = append(w.Hooks, hookName) webHookMap.Store(hUrl, hf) - Log().Infof("Hook [%s - %s] %s", w.pipelineLabel, w.namespace, baseURL+hUrl) + Log.Infof("Hook [%s - %s] %s", w.pipelineLabel, w.namespace, baseURL+hUrl) } // Delete a route func (w *webHook) Delete(hookName string) { hUrl := w.buildURL(hookName) webHookMap.Delete(hUrl) - Log().Debugf("WebHook unregisted [%s]", hUrl) + Log.Debugf("WebHook unregisted [%s]", hUrl) } // Delete all routes belonging to webHook @@ -50,13 +54,24 @@ func (w *webHook) Unregister() { } } +func Handler(host string) http.Handler { + addrSpit := strings.Split(host, ":") + if addrSpit[0] == "0.0.0.0" { + addrSpit[0] = fqdn.Get() + } + baseURL = fmt.Sprintf("http://%s:%s", addrSpit[0], addrSpit[1]) + + commonHandlers := alice.New(loggingHandler, recoverHandler) + return commonHandlers.ThenFunc(routerHandler) +} + func routerHandler(w http.ResponseWriter, r *http.Request) { hUrl := strings.ToLower(r.URL.Path) if hfi, ok := webHookMap.Load(hUrl); ok { - Log().Debugf("Webhook found for %s", hUrl) + Log.Debugf("Webhook found for %s", hUrl) hfi.(http.HandlerFunc)(w, r) } else { - Log().Warnf("Webhook not found for %s", hUrl) + Log.Warnf("Webhook not found for %s", hUrl) w.WriteHeader(404) fmt.Fprint(w, "Not Found !") } @@ -65,7 +80,7 @@ func routerHandler(w http.ResponseWriter, r *http.Request) { func loggingHandler(next http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { next.ServeHTTP(w, r) - Log().Debugf("Webhook [%s] %s", r.Method, r.URL.Path) + Log.Debugf("Webhook [%s] %s", r.Method, r.URL.Path) } return http.HandlerFunc(fn) } @@ -74,7 +89,7 @@ func recoverHandler(next http.Handler) http.Handler { fn := func(w http.ResponseWriter, r *http.Request) { defer func() { if err := recover(); err != nil { - Log().Errorf("Webhook panic [%s] %s : %+v", r.Method, r.URL.Path, err) + Log.Errorf("Webhook panic [%s] %s : %+v", r.Method, r.URL.Path, err) http.Error(w, http.StatusText(500), 500) } }() diff --git a/entrypoint/entrypoint.go b/entrypoint/entrypoint.go index 5867db7f..de74460a 100644 --- a/entrypoint/entrypoint.go +++ b/entrypoint/entrypoint.go @@ -11,7 +11,7 @@ import ( "regexp" "strings" - "github.com/vjeantet/bitfan/core/config" + "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/entrypoint/parser" ) @@ -22,7 +22,7 @@ const ( CONTENT_INLINE // Content is a value ) -// Entrypoint is a the pipeline's definition +// Entrypoint is a the pipeline's definition ressource type Entrypoint struct { Path string Kind int // Kind of content @@ -107,37 +107,78 @@ func (e *EntrypointList) AddEntrypoint(loc *Entrypoint) error { return nil } -// ConfigPipeline returns a core Pipeline from entrypoint's definition -func (e *Entrypoint) ConfigPipeline() config.Pipeline { - var pipeline *config.Pipeline +func (e *Entrypoint) Pipeline() (*core.Pipeline, error) { + pipeline := core.NewPipeline() + + if e.PipelineUuid != "" { + pipeline.Uuid = e.PipelineUuid + } + + if e.PipelineName != "" { + pipeline.Label = e.PipelineName + } switch e.Kind { case CONTENT_INLINE: - pipeline = config.NewPipeline("inline", "nodescription", "inline") + pipeline.Label = "inline" + pipeline.ConfigLocation = "inline" case CONTENT_REF_URL: uriSegments := strings.Split(e.Path, "/") pipelineName := strings.Join(uriSegments[2:], ".") - pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) + pipeline.Label = pipelineName + pipeline.ConfigLocation = e.Path case CONTENT_REF_FS: filename := filepath.Base(e.Path) extension := filepath.Ext(filename) pipelineName := filename[0 : len(filename)-len(extension)] - pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) + pipeline.Label = pipelineName + pipeline.ConfigLocation = e.Path } - if e.PipelineName != "" { - pipeline.Name = e.PipelineName + agents, err := e.agents() + if err != nil { + return nil, err } - if e.PipelineUuid != "" { - pipeline.Uuid = e.PipelineUuid + + for _, a := range agents { + pipeline.AddAgent(a) } - return *pipeline + // pipeline.Dagents() + return pipeline, nil } +// ConfigPipeline returns a core Pipeline from entrypoint's definition +// func (e *Entrypoint) ConfigPipeline() config.Pipeline { +// var pipeline *config.Pipeline + +// switch e.Kind { +// case CONTENT_INLINE: +// pipeline = config.NewPipeline("inline", "nodescription", "inline") +// case CONTENT_REF_URL: +// uriSegments := strings.Split(e.Path, "/") +// pipelineName := strings.Join(uriSegments[2:], ".") +// pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) +// case CONTENT_REF_FS: +// filename := filepath.Base(e.Path) +// extension := filepath.Ext(filename) +// pipelineName := filename[0 : len(filename)-len(extension)] +// pipeline = config.NewPipeline(pipelineName, "nodescription", e.Path) +// } + +// if e.PipelineName != "" { +// pipeline.Name = e.PipelineName +// } +// if e.PipelineUuid != "" { +// pipeline.Uuid = e.PipelineUuid +// } + +// return *pipeline +// } + // ConfigPipeline returns core agents from entrypoint's definition -func (e *Entrypoint) ConfigAgents() ([]config.Agent, error) { - var agents []config.Agent +func (e *Entrypoint) agents() ([]core.Agent, error) { + var agents []core.Agent var content []byte var err error var cwd string diff --git a/entrypoint/parser/pipelinebuilder.go b/entrypoint/parser/pipelinebuilder.go index a2370dd2..a5e29bfd 100644 --- a/entrypoint/parser/pipelinebuilder.go +++ b/entrypoint/parser/pipelinebuilder.go @@ -5,18 +5,18 @@ import ( "fmt" "strconv" - "github.com/vjeantet/bitfan/core/config" + "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/entrypoint/parser/logstash" ) -var entryPointContent func(string, string,map[string]interface{}) ([]byte, string, error) +var entryPointContent func(string, string, map[string]interface{}) ([]byte, string, error) -func parseConfigLocation(path string, options map[string]interface{}, pwd string, pickSections ...string) ([]config.Agent, error) { +func parseConfigLocation(path string, options map[string]interface{}, pwd string, pickSections ...string) ([]core.Agent, error) { if path == "" { - return []config.Agent{}, fmt.Errorf("no location provided to get content from ; options=%v ", options) + return []core.Agent{}, fmt.Errorf("no location provided to get content from ; options=%v ", options) } - content, cwd, err := entryPointContent(path, pwd,options) + content, cwd, err := entryPointContent(path, pwd, options) if err != nil { return nil, err @@ -26,14 +26,14 @@ func parseConfigLocation(path string, options map[string]interface{}, pwd string return agents, err } -func BuildAgents(content []byte, pwd string, contentProvider func(string, string, map[string]interface{}) ([]byte, string, error)) ([]config.Agent, error) { +func BuildAgents(content []byte, pwd string, contentProvider func(string, string, map[string]interface{}) ([]byte, string, error)) ([]core.Agent, error) { entryPointContent = contentProvider return buildAgents(content, pwd) } -func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.Agent, error) { +func buildAgents(content []byte, pwd string, pickSections ...string) ([]core.Agent, error) { var i int - agentConfList := []config.Agent{} + agentConfList := []core.Agent{} if len(pickSections) == 0 { pickSections = []string{"input", "filter", "output"} } @@ -46,7 +46,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A return agentConfList, err } - outPorts := []config.Port{} + outPorts := []core.Port{} if _, ok := LSConfiguration.Sections["input"]; ok && isInSlice("input", pickSections) { for pluginIndex := 0; pluginIndex < len(LSConfiguration.Sections["input"].Plugins); pluginIndex++ { @@ -65,7 +65,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A if _, ok := LSConfiguration.Sections["filter"]; ok && isInSlice("filter", pickSections) { if _, ok := LSConfiguration.Sections["filter"]; ok { for pluginIndex := 0; pluginIndex < len(LSConfiguration.Sections["filter"].Plugins); pluginIndex++ { - var agents []config.Agent + var agents []core.Agent i++ plugin := LSConfiguration.Sections["filter"].Plugins[pluginIndex] agents, outPorts, err = buildFilterAgents(plugin, outPorts, pwd) @@ -80,7 +80,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A if _, ok := LSConfiguration.Sections["output"]; ok && isInSlice("output", pickSections) { for pluginIndex := 0; pluginIndex < len(LSConfiguration.Sections["output"].Plugins); pluginIndex++ { - var agents []config.Agent + var agents []core.Agent i++ plugin := LSConfiguration.Sections["output"].Plugins[pluginIndex] agents, err = buildOutputAgents(plugin, outPorts, pwd) @@ -96,10 +96,10 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]config.A } // TODO : this should return ports to be able to use multiple path use -func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []config.Port, error) { +func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]core.Agent, []core.Port, error) { - var agent config.Agent - agent = config.NewAgent() + var agent core.Agent + agent = core.NewAgent() agent.Type = "input_" + plugin.Name if plugin.Label == "" { agent.Label = plugin.Name @@ -121,7 +121,7 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []co codecs := map[int]interface{}{} for i, codec := range plugin.Codecs { if codec.Name != "" { - pcodec := config.NewCodec(codec.Name) + pcodec := core.NewCodec(codec.Name) for _, setting := range codec.Settings { pcodec.Options[setting.K] = setting.V if setting.K == "role" { @@ -149,15 +149,15 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []co } // add agent "use" - set use agent Source as last From FileConfigAgents - inPort := config.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} + inPort := core.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} agent.AgentSources = append(agent.AgentSources, inPort) - fileConfigAgents = append([]config.Agent{agent}, fileConfigAgents...) + fileConfigAgents = append([]core.Agent{agent}, fileConfigAgents...) - outPort := config.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} - return fileConfigAgents, []config.Port{outPort}, nil + outPort := core.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} + return fileConfigAgents, []core.Port{outPort}, nil case []interface{}: - CombinedFileConfigAgents := []config.Agent{} - newOutPorts := []config.Port{} + CombinedFileConfigAgents := []core.Agent{} + newOutPorts := []core.Port{} for _, p := range v.([]interface{}) { // contruire le pipeline a fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "input", "filter") @@ -169,7 +169,7 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []co CombinedFileConfigAgents = append(CombinedFileConfigAgents, fileConfigAgents...) // add agent "use" - set use agent Source as last From FileConfigAgents - inPort := config.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} + inPort := core.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0} newOutPorts = append(newOutPorts, inPort) } @@ -177,10 +177,10 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []co agent.AgentSources = append(agent.AgentSources, newOutPorts...) // add "use" plugin to combined pipelines - CombinedFileConfigAgents = append([]config.Agent{agent}, CombinedFileConfigAgents...) + CombinedFileConfigAgents = append([]core.Agent{agent}, CombinedFileConfigAgents...) // return pipeline a b c ... with theirs respectives outputs - return CombinedFileConfigAgents, []config.Port{{AgentID: agent.ID, PortNumber: 0}}, nil + return CombinedFileConfigAgents, []core.Port{{AgentID: agent.ID, PortNumber: 0}}, nil } } } @@ -221,15 +221,15 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]config.Agent, []co } } - outPort := config.Port{AgentID: agent.ID, PortNumber: 0} - return []config.Agent{agent}, []config.Port{outPort}, nil + outPort := core.Port{AgentID: agent.ID, PortNumber: 0} + return []core.Agent{agent}, []core.Port{outPort}, nil } -func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, error) { - agent_list := []config.Agent{} +func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, error) { + agent_list := []core.Agent{} - var agent config.Agent - agent = config.NewAgent() + var agent core.Agent + agent = core.NewAgent() agent.Type = "output_" + plugin.Name if plugin.Label == "" { agent.Label = plugin.Name @@ -251,7 +251,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd codecs := map[int]interface{}{} for i, codec := range plugin.Codecs { if codec.Name != "" { - pcodec := config.NewCodec(codec.Name) + pcodec := core.NewCodec(codec.Name) for _, setting := range codec.Settings { pcodec.Options[setting.K] = setting.V if setting.K == "role" { @@ -285,7 +285,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} firstUsedAgent.AgentSources = append(firstUsedAgent.AgentSources, inPort) } @@ -293,7 +293,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd return fileConfigAgents, nil case []interface{}: - CombinedFileConfigAgents := []config.Agent{} + CombinedFileConfigAgents := []core.Agent{} for _, p := range v.([]interface{}) { fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter", "output") if err != nil { @@ -302,7 +302,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} firstUsedAgent.AgentSources = append(firstUsedAgent.AgentSources, inPort) } CombinedFileConfigAgents = append(CombinedFileConfigAgents, fileConfigAgents...) @@ -314,9 +314,9 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd } // Plugin Sources - agent.AgentSources = config.PortList{} + agent.AgentSources = core.PortList{} for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} agent.AgentSources = append(agent.AgentSources, inPort) } @@ -336,7 +336,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd agent.Options["expressions"].(map[int]string)[expressionIndex] = when.Expression // recupérer le outport associé (expressionIndex) - expressionOutPorts := []config.Port{ + expressionOutPorts := []core.Port{ {AgentID: agent.ID, PortNumber: expressionIndex}, } @@ -344,7 +344,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // en utilisant le expressionOutPorts for pi := 0; pi < len(when.Plugins); pi++ { p := when.Plugins[pi] - var agents []config.Agent + var agents []core.Agent var err error // récupérer le dernier outport du plugin créé il devient expressionOutPorts agents, err = buildOutputAgents(p, expressionOutPorts, pwd) @@ -369,16 +369,16 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd } // ajoute l'agent à la liste des agents - agent_list = append([]config.Agent{agent}, agent_list...) + agent_list = append([]core.Agent{agent}, agent_list...) return agent_list, nil } -func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd string) ([]config.Agent, []config.Port, error) { +func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, []core.Port, error) { - agent_list := []config.Agent{} + agent_list := []core.Agent{} - var agent config.Agent - agent = config.NewAgent() + var agent core.Agent + agent = core.NewAgent() agent.Type = plugin.Name if plugin.Label == "" { agent.Label = plugin.Name @@ -401,7 +401,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd codecs := map[int]interface{}{} for i, codec := range plugin.Codecs { if codec.Name != "" { - pcodec := config.NewCodec(codec.Name) + pcodec := core.NewCodec(codec.Name) for _, setting := range codec.Settings { pcodec.Options[setting.K] = setting.V if setting.K == "role" { @@ -431,18 +431,18 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} firstUsedAgent.AgentSources = append(firstUsedAgent.AgentSources, inPort) } - newOutPorts := []config.Port{ + newOutPorts := []core.Port{ {AgentID: fileConfigAgents[0].ID, PortNumber: 0}, } return fileConfigAgents, newOutPorts, nil case []interface{}: - CombinedFileConfigAgents := []config.Agent{} - newOutPorts := []config.Port{} + CombinedFileConfigAgents := []core.Agent{} + newOutPorts := []core.Port{} for _, p := range v.([]interface{}) { // contruire le pipeline a fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter") @@ -453,28 +453,28 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // connect pipeline a first agent Xsource to lastOutPorts output firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} firstUsedAgent.AgentSources = append(firstUsedAgent.AgentSources, inPort) } // save pipeline a for later return CombinedFileConfigAgents = append(CombinedFileConfigAgents, fileConfigAgents...) // save pipeline a outputs for later return - newOutPorts = append(newOutPorts, config.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0}) + newOutPorts = append(newOutPorts, core.Port{AgentID: fileConfigAgents[0].ID, PortNumber: 0}) } // connect all collected newOutPorts to "use" agent agent.AgentSources = append(agent.AgentSources, newOutPorts...) - CombinedFileConfigAgents = append([]config.Agent{agent}, CombinedFileConfigAgents...) + CombinedFileConfigAgents = append([]core.Agent{agent}, CombinedFileConfigAgents...) // return pipeline a b c ... with theirs respectives outputs - return CombinedFileConfigAgents, []config.Port{{AgentID: agent.ID, PortNumber: 0}}, nil + return CombinedFileConfigAgents, []core.Port{{AgentID: agent.ID, PortNumber: 0}}, nil } } } // route = set a pipeline, but do not reconnect it if plugin.Name == "route" { - CombinedFileConfigAgents := []config.Agent{} + CombinedFileConfigAgents := []core.Agent{} for _, p := range agent.Options["path"].([]interface{}) { fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter", "output") if err != nil { @@ -483,7 +483,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // connect pipeline a last agent Xsource to lastOutPorts output lastUsedAgent := &fileConfigAgents[0] - lastUsedAgent.AgentSources = append(lastUsedAgent.AgentSources, config.Port{AgentID: agent.ID, PortNumber: 0}) + lastUsedAgent.AgentSources = append(lastUsedAgent.AgentSources, core.Port{AgentID: agent.ID, PortNumber: 0}) CombinedFileConfigAgents = append(CombinedFileConfigAgents, fileConfigAgents...) } @@ -491,10 +491,10 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // connect route to lastOutPorts agent.AgentSources = append(agent.AgentSources, lastOutPorts...) // add route to routeedpipelines - CombinedFileConfigAgents = append(CombinedFileConfigAgents, []config.Agent{agent}...) + CombinedFileConfigAgents = append(CombinedFileConfigAgents, []core.Agent{agent}...) // return untouched outputsPorts - return CombinedFileConfigAgents, []config.Port{{AgentID: agent.ID, PortNumber: 1}}, nil + return CombinedFileConfigAgents, []core.Port{{AgentID: agent.ID, PortNumber: 1}}, nil } // interval can be a number, a string number or a cron string pattern @@ -534,20 +534,20 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd } // Plugin Sources - agent.AgentSources = config.PortList{} + agent.AgentSources = core.PortList{} for _, sourceport := range lastOutPorts { - inPort := config.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} agent.AgentSources = append(agent.AgentSources, inPort) } // By Default Agents output to port 0 - newOutPorts := []config.Port{ + newOutPorts := []core.Port{ {AgentID: agent.ID, PortNumber: 0}, } // Is this Plugin has conditional expressions ? if len(plugin.When) > 0 { - outPorts_when := []config.Port{} + outPorts_when := []core.Port{} // le plugin WHEn est $plugin agent.Options["expressions"] = map[int]string{} elseOK := false @@ -560,7 +560,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd elseOK = true } // recupérer le outport associé (expressionIndex) - expressionOutPorts := []config.Port{ + expressionOutPorts := []core.Port{ {AgentID: agent.ID, PortNumber: expressionIndex}, } @@ -568,7 +568,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // en utilisant le outportA for pi := 0; pi < len(when.Plugins); pi++ { p := when.Plugins[pi] - var agents []config.Agent + var agents []core.Agent var err error // récupérer le dernier outport du plugin créé il devient outportA agents, expressionOutPorts, err = buildFilterAgents(p, expressionOutPorts, pwd) @@ -587,7 +587,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd // If no else expression was found, insert one if elseOK == false { agent.Options["expressions"].(map[int]string)[len(agent.Options["expressions"].(map[int]string))] = "true" - elseOutPorts := []config.Port{ + elseOutPorts := []core.Port{ {AgentID: agent.ID, PortNumber: len(agent.Options["expressions"].(map[int]string)) - 1}, } newOutPorts = append(elseOutPorts, newOutPorts...) @@ -595,7 +595,7 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []config.Port, pwd } // ajoute l'agent à la liste des agents - agent_list = append([]config.Agent{agent}, agent_list...) + agent_list = append([]core.Agent{agent}, agent_list...) return agent_list, newOutPorts, nil } diff --git a/processors/base.go b/processors/base.go index ac7e8a9f..fc9cb5c4 100644 --- a/processors/base.go +++ b/processors/base.go @@ -3,7 +3,6 @@ package processors import ( "github.com/mitchellh/mapstructure" "github.com/vjeantet/bitfan/codecs" - "github.com/vjeantet/bitfan/core/config" "github.com/vjeantet/bitfan/processors/doc" "gopkg.in/go-playground/validator.v8" ) @@ -76,18 +75,18 @@ func (b *Base) ConfigureAndValidate(ctx ProcessorContext, conf map[string]interf if v, ok := conf["codecs"]; ok { codecCollection := &codecs.CodecCollection{} for _, v := range v.(map[int]interface{}) { - switch vcodec := v.(type) { - case *config.Codec: - c := codecs.New(vcodec.Name, vcodec.Options, ctx.Log(), ctx.ConfigWorkingLocation()) - switch vcodec.Role { - case "encoder": - codecCollection.Enc = c - case "decoder": - codecCollection.Dec = c - default: - codecCollection.Default = c - } + vcodec := v.(ICodec) + + c := codecs.New(vcodec.GetName(), vcodec.GetOptions(), ctx.Log(), ctx.ConfigWorkingLocation()) + switch vcodec.GetRole() { + case "encoder": + codecCollection.Enc = c + case "decoder": + codecCollection.Dec = c + default: + codecCollection.Default = c } + } conf["codec"] = codecCollection delete(conf, "codecs") diff --git a/processors/codec.go b/processors/codec.go new file mode 100644 index 00000000..bfe59cbe --- /dev/null +++ b/processors/codec.go @@ -0,0 +1,7 @@ +package processors + +type ICodec interface { + GetName() string + GetOptions() map[string]interface{} + GetRole() string +} diff --git a/processors/commonoptions.go b/processors/commonoptions.go index 141afc42..bd54c33e 100644 --- a/processors/commonoptions.go +++ b/processors/commonoptions.go @@ -1,8 +1,6 @@ package processors -import ( - "github.com/clbanning/mxj" -) +import "github.com/clbanning/mxj" type CommonOptions struct { // If this filter is successful, add any arbitrary fields to this event. @@ -34,5 +32,19 @@ type CommonOptions struct { } func (c *CommonOptions) ProcessCommonOptions(data *mxj.Map) { - processCommonFields(data, c.AddField, c.AddTag, c.Type, c.RemoveField, c.RemoveTag) + if len(c.AddField) > 0 { + AddFields(c.AddField, data) + } + if len(c.AddTag) > 0 { + AddTags(c.AddTag, data) + } + if c.Type != "" { + SetType(c.Type, data) + } + if len(c.RemoveField) > 0 { + RemoveFields(c.RemoveField, data) + } + if len(c.RemoveTag) > 0 { + RemoveTags(c.RemoveTag, data) + } } diff --git a/processors/field.go b/processors/field.go index c1df418e..b040b44b 100644 --- a/processors/field.go +++ b/processors/field.go @@ -52,30 +52,6 @@ func Dynamic(str *string, fields *mxj.Map) { } } -func processCommonFields(data *mxj.Map, - add_fields map[string]interface{}, - tags []string, - typevalue string, - remove_field []string, - remove_tag []string) { - - if len(add_fields) > 0 { - AddFields(add_fields, data) - } - if len(tags) > 0 { - AddTags(tags, data) - } - if typevalue != "" { - SetType(typevalue, data) - } - if len(remove_field) > 0 { - RemoveFields(remove_field, data) - } - if len(remove_tag) > 0 { - RemoveTags(remove_tag, data) - } -} - func SetType(typevalue string, data *mxj.Map) { Dynamic(&typevalue, data) data.SetValueForPath(typevalue, "type") diff --git a/processors/filter-eval/eval.go b/processors/filter-eval/eval.go index abe855bb..05f32bc1 100644 --- a/processors/filter-eval/eval.go +++ b/processors/filter-eval/eval.go @@ -30,7 +30,7 @@ import ( "text/template" "github.com/Knetic/govaluate" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" "github.com/vjeantet/bitfan/processors" ) @@ -86,7 +86,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i //Prepare templates for key, tplLocStr := range p.opt.Templates { - loc, err := location.NewLocation(tplLocStr, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(tplLocStr, p.ConfigWorkingLocation) if err != nil { return err } diff --git a/processors/httppoller/httppoller.go b/processors/httppoller/httppoller.go index 045c86e5..0170eee7 100644 --- a/processors/httppoller/httppoller.go +++ b/processors/httppoller/httppoller.go @@ -9,7 +9,7 @@ import ( "github.com/parnurzeal/gorequest" "github.com/vjeantet/bitfan/codecs" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" "github.com/vjeantet/bitfan/processors" ) @@ -86,7 +86,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i err := p.ConfigureAndValidate(ctx, conf, p.opt) if p.opt.Body != "" { - loc, err := location.NewLocation(p.opt.Body, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.Body, p.ConfigWorkingLocation) if err != nil { return err } @@ -181,11 +181,11 @@ func (p *processor) Receive(e processors.IPacket) error { var record interface{} if err = dec.Decode(&record); err != nil { if err == io.EOF { - p.Logger.Warnln("error while http read docoding : ", err) + p.Logger.Debugln("error while http read docoding : ", err) } else { p.Logger.Errorln("error while http read docoding : ", err) - break } + return nil } e2 := e.Clone() diff --git a/processors/input-exec/execinput.go b/processors/input-exec/execinput.go index 6d72fef4..f8aa9b3d 100644 --- a/processors/input-exec/execinput.go +++ b/processors/input-exec/execinput.go @@ -73,6 +73,10 @@ func (p *processor) Tick(e processors.IPacket) error { var dec codecs.Decoder pr, pw := io.Pipe() + defer func() { + pr.Close() + pw.Close() + }() if dec, err = p.opt.Codec.NewDecoder(pr); err != nil { p.Logger.Errorln("decoder error : ", err.Error()) @@ -88,7 +92,13 @@ func (p *processor) Tick(e processors.IPacket) error { for dec.More() { var record interface{} if err := dec.Decode(&record); err != nil { - return err + if err == io.EOF { + p.Logger.Debugln("error while exec docoding : ", err) + return nil + } else { + p.Logger.Errorln("error while exec docoding : ", err) + return err + } } else { ne := p.NewPacket(data, map[string]interface{}{ "host": p.host, @@ -111,8 +121,6 @@ func (p *processor) Tick(e processors.IPacket) error { }() err = cmd.Wait() - pw.Close() - // ----- if err != nil { diff --git a/processors/input-file/file.go b/processors/input-file/file.go index b6638f13..124ec2c5 100644 --- a/processors/input-file/file.go +++ b/processors/input-file/file.go @@ -9,6 +9,7 @@ package file import ( "fmt" + "io" "os" "path/filepath" "time" @@ -289,7 +290,13 @@ func (p *processor) readfile(pathfile string) error { var record interface{} if err := dec.Decode(&record); err != nil { - return err + if err == io.EOF { + p.Logger.Debugln("error while exec docoding : ", err) + return nil + } else { + p.Logger.Errorln("error while exec docoding : ", err) + return err + } } else { var e processors.IPacket switch v := record.(type) { diff --git a/processors/input-httpserver/httpserver.go b/processors/input-httpserver/httpserver.go index ba308bad..08a5b15c 100644 --- a/processors/input-httpserver/httpserver.go +++ b/processors/input-httpserver/httpserver.go @@ -163,7 +163,7 @@ func (p *processor) HttpHandler(w http.ResponseWriter, r *http.Request) { body = map[string]interface{}{} if err = dec.Decode(&record); err != nil { if err == io.EOF { - p.Logger.Warnln("error while http read docoding : ", err) + p.Logger.Debugln("error while http read docoding : ", err) } else { p.Logger.Errorln("error while http read docoding : ", err) break diff --git a/processors/input-stdin/stdin.go b/processors/input-stdin/stdin.go index 8a5b9542..e83e1450 100644 --- a/processors/input-stdin/stdin.go +++ b/processors/input-stdin/stdin.go @@ -9,7 +9,6 @@ import ( "time" "github.com/vjeantet/bitfan/codecs" - "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/processors" ) @@ -85,7 +84,8 @@ func (p *processor) Start(e processors.IPacket) error { if err == io.EOF { p.Logger.Debugf("codec end of file", err.Error()) if p.opt.EofExit { - core.Stop() + // TODO core.Stop() + p.Logger.Fatalln("IMPLEMENT THIS") p, _ := os.FindProcess(os.Getpid()) p.Signal(os.Interrupt) } diff --git a/processors/input-tail/tail.go b/processors/input-tail/tail.go index 9a82c0b1..f68568c5 100644 --- a/processors/input-tail/tail.go +++ b/processors/input-tail/tail.go @@ -154,7 +154,6 @@ func (p *processor) filesToRead() ([]string, error) { // find files for _, currentPath := range fixedPaths { if currentMatches, err := zglob.Glob(currentPath); err == nil { - // if currentMatches, err := filepath.Glob(currentPath); err == nil { matches = append(matches, currentMatches...) continue } @@ -244,7 +243,13 @@ func (p *processor) Start(e processors.IPacket) error { if err := p.discoverFilesToRead(); err != nil { p.Logger.Error(err) } - <-ticker.C + select { + case <-ticker.C: + continue + case <-p.q: + return + } + } }() @@ -305,6 +310,10 @@ func (p *processor) tailFile(path string, q chan bool) error { var dec codecs.Decoder pr, pw := io.Pipe() + defer func() { + pr.Close() + pw.Close() + }() if dec, err = p.opt.Codec.NewDecoder(pr); err != nil { p.Logger.Errorln("decoder error : ", err.Error()) @@ -312,10 +321,15 @@ func (p *processor) tailFile(path string, q chan bool) error { } go func() { - for { + for dec.More() { var record interface{} if err := dec.Decode(&record); err != nil { - p.Logger.Errorln("codec error : ", err.Error()) + if err == io.EOF { + p.Logger.Debugf("codec EOF ") + } else { + p.Logger.Errorln("codec error : ", err.Error()) + } + return } var e processors.IPacket @@ -349,8 +363,6 @@ func (p *processor) tailFile(path string, q chan bool) error { for line := range t.Lines { fmt.Fprintf(pw, "%s\n", line.Text) } - pr.Close() - pw.Close() return nil } diff --git a/processors/output-email/email.go b/processors/output-email/email.go index dc4ef489..e4d5e9e2 100644 --- a/processors/output-email/email.go +++ b/processors/output-email/email.go @@ -8,7 +8,7 @@ import ( "path/filepath" "strings" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" "github.com/vjeantet/bitfan/processors" gomail "gopkg.in/gomail.v2" ) @@ -158,7 +158,7 @@ func (p *processor) Receive(e processors.IPacket) error { // pp.Println("subject-->", subject) if p.opt.Subject != "" { - loc, err := location.NewLocation(p.opt.Subject, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.Subject, p.ConfigWorkingLocation) if err != nil { p.Logger.Errorf("email subject template error : %v", err) return err @@ -174,7 +174,7 @@ func (p *processor) Receive(e processors.IPacket) error { } if p.opt.HTMLBody != "" { - loc, err := location.NewLocation(p.opt.HTMLBody, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.HTMLBody, p.ConfigWorkingLocation) if err != nil { p.Logger.Errorf("email subject template error : %v", err) return err @@ -191,7 +191,7 @@ func (p *processor) Receive(e processors.IPacket) error { } if p.opt.Body != "" { - loc, err := location.NewLocation(p.opt.Body, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.Body, p.ConfigWorkingLocation) if err != nil { p.Logger.Errorf("email subject template error : %v", err) return err diff --git a/processors/sql/sql.go b/processors/sql/sql.go index 86eb645d..b8054acc 100644 --- a/processors/sql/sql.go +++ b/processors/sql/sql.go @@ -10,7 +10,7 @@ import ( fqdn "github.com/ShowMax/go-fqdn" _ "github.com/go-sql-driver/mysql" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" "github.com/vjeantet/bitfan/processors" ) @@ -87,7 +87,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i p.Logger.Warningln("No interval set") } - loc, err := location.NewLocation(p.opt.Statement, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.Statement, p.ConfigWorkingLocation) if err != nil { return err } diff --git a/processors/stop/stop.go b/processors/stop/stop.go index eacf4850..8c03fcf4 100644 --- a/processors/stop/stop.go +++ b/processors/stop/stop.go @@ -8,7 +8,6 @@ package stopprocessor import ( "os" - "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/processors" ) @@ -45,10 +44,12 @@ func (p *processor) Receive(e processors.IPacket) error { p.opt.ProcessCommonOptions(e.Fields()) p.Send(e) - core.StopPipeline(p.PipelineUUID) + // TODO core.StopPipeline(p.PipelineUUID) + p.Logger.Fatalln("IMPLEMENT THIS") if true == p.opt.ExitBitfan { - core.Stop() + // TODO core.Stop() + p.Logger.Fatalln("IMPLEMENT THIS") p, _ := os.FindProcess(os.Getpid()) p.Signal(os.Interrupt) } diff --git a/processors/template/template.go b/processors/template/template.go index 69a778b6..940282ee 100644 --- a/processors/template/template.go +++ b/processors/template/template.go @@ -5,7 +5,7 @@ import ( "bytes" "text/template" - "github.com/vjeantet/bitfan/core/location" + "github.com/vjeantet/bitfan/commons" "github.com/vjeantet/bitfan/processors" ) @@ -57,7 +57,7 @@ func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]i return err } - loc, err := location.NewLocation(p.opt.Location, p.ConfigWorkingLocation) + loc, err := commons.NewLocation(p.opt.Location, p.ConfigWorkingLocation) if err != nil { return err } diff --git a/core/store/assets.go b/store/assets.go similarity index 98% rename from core/store/assets.go rename to store/assets.go index 8702fc4f..08271311 100644 --- a/core/store/assets.go +++ b/store/assets.go @@ -5,7 +5,7 @@ import ( "time" "github.com/timshannon/bolthold" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" ) type StoreAsset struct { diff --git a/core/store/pipelines.go b/store/pipelines.go similarity index 99% rename from core/store/pipelines.go rename to store/pipelines.go index 867e5082..2a2e4824 100644 --- a/core/store/pipelines.go +++ b/store/pipelines.go @@ -5,7 +5,7 @@ import ( "time" "github.com/timshannon/bolthold" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" ) type StorePipeline struct { diff --git a/core/store/processors.go b/store/processors.go similarity index 100% rename from core/store/processors.go rename to store/processors.go diff --git a/core/store/store.go b/store/store.go similarity index 91% rename from core/store/store.go rename to store/store.go index a637f1cb..c7835a5f 100644 --- a/core/store/store.go +++ b/store/store.go @@ -10,16 +10,17 @@ import ( "github.com/boltdb/bolt" "github.com/timshannon/bolthold" - "github.com/vjeantet/bitfan/core/models" + "github.com/vjeantet/bitfan/api/models" + "github.com/vjeantet/bitfan/commons" ) type Store struct { db *bolthold.Store pipelineTmpPath string - log Logger + log commons.Logger } -func New(location string, log Logger) (*Store, error) { +func New(location string, log commons.Logger) (*Store, error) { database, err := bolthold.Open(filepath.Join(location, "bitfan.bolt.db"), 0666, nil) pipelineTmpPath := filepath.Join(location, "_pipelines")