diff --git a/cmd/bitfan/commands/plugins.go b/cmd/bitfan/commands/plugins.go index 3a67ab61..6b3adf90 100644 --- a/cmd/bitfan/commands/plugins.go +++ b/cmd/bitfan/commands/plugins.go @@ -39,6 +39,7 @@ import ( fileoutput "github.com/vjeantet/bitfan/processors/output-file" glusterfsoutput "github.com/vjeantet/bitfan/processors/output-glusterfs" httpoutput "github.com/vjeantet/bitfan/processors/output-http" + tcpoutput "github.com/vjeantet/bitfan/processors/output-tcp" mongodb "github.com/vjeantet/bitfan/processors/output-mongodb" null "github.com/vjeantet/bitfan/processors/output-null" rabbitmqoutput "github.com/vjeantet/bitfan/processors/output-rabbitmq" @@ -127,6 +128,7 @@ func init() { initPlugin("output", "rabbitmq", rabbitmqoutput.New) initPlugin("output", "email", email.New) initPlugin("output", "http", httpoutput.New) + initPlugin("output", "tcp", tcpoutput.New) initPlugin("output", "sql", sqlprocessor.New) initPlugin("output", "template", templateprocessor.New) initPlugin("output", "httpout", httpoutprocessor.New) diff --git a/cmd/bitfan/commands/run.go b/cmd/bitfan/commands/run.go index d5df39d1..c2b9a839 100644 --- a/cmd/bitfan/commands/run.go +++ b/cmd/bitfan/commands/run.go @@ -108,8 +108,20 @@ When no configuration is passed to the command, bitfan use the config set in glo cwd, _ := os.Getwd() if len(args) == 0 { for _, v := range viper.GetStringSlice("config") { - loc, _ := entrypoint.New(v, cwd, entrypoint.CONTENT_REF) - entrypoints.AddEntrypoint(loc) + files, err := filepath.Glob(v) + if err != nil { + core.Log().Errorf("can't match '%s' with err: %v", v, err) + continue + } + for _, file := range files { + core.Log().Debugf("add config file '%s'", file) + loc, err := entrypoint.New(file, cwd, entrypoint.CONTENT_REF) + if err != nil { + core.Log().Errorf("can't add etrypoint from '%s' with err: %v", file, err) + continue + } + entrypoints.AddEntrypoint(loc) + } } } diff --git a/docs/data/processors/tcpoutput.json b/docs/data/processors/tcpoutput.json new file mode 100644 index 00000000..90571915 --- /dev/null +++ b/docs/data/processors/tcpoutput.json @@ -0,0 +1,78 @@ +{ + "Behavior": "", + "Doc": "", + "DocShort": "", + "ImportPath": "github.com/vjeantet/bitfan/processors/output-tcp", + "Name": "tcpoutput", + "Options": { + "Doc": "", + "Options": [ + { + "Alias": "codec", + "DefaultValue": "\"line\"", + "Doc": "The codec used for input data. Input codecs are a convenient method for decoding\nyour data before it enters the input, without needing a separate filter in your bitfan pipeline", + "ExampleLS": "", + "Name": "Codec", + "PossibleValues": [ + "\"json\"", + "\"line\"", + "\"pp\"", + "\"rubydebug\"" + ], + "Required": false, + "Type": "codec" + }, + { + "Alias": "host", + "DefaultValue": null, + "Doc": "", + "ExampleLS": "", + "Name": "Host", + "PossibleValues": null, + "Required": true, + "Type": "string" + }, + { + "Alias": "port", + "DefaultValue": null, + "Doc": "", + "ExampleLS": "", + "Name": "Port", + "PossibleValues": null, + "Required": true, + "Type": "uint" + }, + { + "Alias": "keepalive", + "DefaultValue": "true", + "Doc": "Turn this on to enable HTTP keepalive support. Default value is true", + "ExampleLS": "", + "Name": "KeepAlive", + "PossibleValues": null, + "Required": false, + "Type": "bool" + }, + { + "Alias": "request_timeout", + "DefaultValue": "30", + "Doc": "Timeout (in seconds) for the entire request. Default value is 60", + "ExampleLS": "", + "Name": "RequestTimeout", + "PossibleValues": null, + "Required": false, + "Type": "uint" + }, + { + "Alias": "retry_interval", + "DefaultValue": "10", + "Doc": "", + "ExampleLS": "", + "Name": "RetryInterval", + "PossibleValues": null, + "Required": false, + "Type": "uint" + } + ] + }, + "Ports": [] +} \ No newline at end of file diff --git a/processors/doc/doccodec.go b/processors/doc/doccodec.go index d913a210..a3a2aa14 100644 --- a/processors/doc/doccodec.go +++ b/processors/doc/doccodec.go @@ -6,6 +6,7 @@ import ( "go/doc" "go/parser" "go/token" + "os" "regexp" "strings" @@ -39,8 +40,8 @@ func NewCodec(pkgPath string) (*Codec, error) { docPkg := doc.New(astPkg, pkgPath, doc.AllDecls) dp.PkgName = docPkg.Name - - dp.ImportPath = strings.TrimPrefix(docPkg.ImportPath, build.Default.GOPATH+"/src/") + p := string(os.PathSeparator) + dp.ImportPath = strings.Replace(strings.TrimPrefix(docPkg.ImportPath, build.Default.GOPATH+p+"src"+p), "\\", "/", -1) dp.Doc = removeSpecialComment(docPkg.Doc) diff --git a/processors/doc/docprocessor.go b/processors/doc/docprocessor.go index 6c9020e1..9ca2f6fc 100644 --- a/processors/doc/docprocessor.go +++ b/processors/doc/docprocessor.go @@ -69,8 +69,8 @@ func NewProcessor(pkgPath string) (*Processor, error) { docPkg := doc.New(astPkg, pkgPath, doc.AllDecls) dp.Name = docPkg.Name - dp.ImportPath = strings.TrimPrefix(docPkg.ImportPath, build.Default.GOPATH+"/src/") - + p := string(os.PathSeparator) + dp.ImportPath = strings.Replace(strings.TrimPrefix(docPkg.ImportPath, build.Default.GOPATH+p+"src"+p), "\\", "/", -1) dp.Doc = removeSpecialComment(docPkg.Doc) for _, typ := range docPkg.Types { diff --git a/processors/output-tcp/docdoc.go b/processors/output-tcp/docdoc.go new file mode 100644 index 00000000..01713a2d --- /dev/null +++ b/processors/output-tcp/docdoc.go @@ -0,0 +1,85 @@ +// Code generated by "bitfanDoc -processor output-tcp"; DO NOT EDIT +package tcpoutput + +import "github.com/vjeantet/bitfan/processors/doc" + +func (p *processor) Doc() *doc.Processor { + return &doc.Processor{ + Behavior: "", + Name: "tcpoutput", + ImportPath: "github.com/vjeantet/bitfan/processors/output-tcp", + Doc: "", + DocShort: "", + Options: &doc.ProcessorOptions{ + Doc: "", + Options: []*doc.ProcessorOption{ + &doc.ProcessorOption{ + Name: "Codec", + Alias: "codec", + Doc: "The codec used for input data. Input codecs are a convenient method for decoding\nyour data before it enters the input, without needing a separate filter in your bitfan pipeline", + Required: false, + Type: "codec", + DefaultValue: "line", + PossibleValues: []string{ + "json", + "line", + "pp", + "rubydebug", + }, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "Host", + Alias: "host", + Doc: "", + Required: true, + Type: "string", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "Port", + Alias: "port", + Doc: "", + Required: true, + Type: "uint", + DefaultValue: nil, + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "KeepAlive", + Alias: "keepalive", + Doc: "Turn this on to enable HTTP keepalive support. Default value is true", + Required: false, + Type: "bool", + DefaultValue: "true", + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "RequestTimeout", + Alias: "request_timeout", + Doc: "Timeout (in seconds) for the entire request. Default value is 60", + Required: false, + Type: "uint", + DefaultValue: "30", + PossibleValues: []string{}, + ExampleLS: "", + }, + &doc.ProcessorOption{ + Name: "RetryInterval", + Alias: "retry_interval", + Doc: "", + Required: false, + Type: "uint", + DefaultValue: "10", + PossibleValues: []string{}, + ExampleLS: "", + }, + }, + }, + Ports: []*doc.ProcessorPort{}, +} +} \ No newline at end of file diff --git a/processors/output-tcp/readme.md b/processors/output-tcp/readme.md new file mode 100644 index 00000000..5b212dda --- /dev/null +++ b/processors/output-tcp/readme.md @@ -0,0 +1,71 @@ +# TCPOUTPUT + + +## Synopsys + + +| SETTING | TYPE | REQUIRED | DEFAULT VALUE | +|-----------------|--------|----------|---------------| +| codec | codec | false | "line" | +| host | string | true | "" | +| port | uint | true | ? | +| keepalive | bool | false | true | +| request_timeout | uint | false | 30 | +| retry_interval | uint | false | 10 | + + +## Details + +### codec +* Value type is codec +* Default value is `"line"` + +The codec used for input data. Input codecs are a convenient method for decoding +your data before it enters the input, without needing a separate filter in your bitfan pipeline + +### host +* This is a required setting. +* Value type is string +* Default value is `""` + + + +### port +* This is a required setting. +* Value type is uint +* Default value is `?` + + + +### keepalive +* Value type is bool +* Default value is `true` + +Turn this on to enable HTTP keepalive support. Default value is true + +### request_timeout +* Value type is uint +* Default value is `30` + +Timeout (in seconds) for the entire request. Default value is 60 + +### retry_interval +* Value type is uint +* Default value is `10` + + + + + +## Configuration blueprint + +``` +tcpoutput{ + codec => "line" + host => "" + port => uint + keepalive => true + request_timeout => 30 + retry_interval => 10 +} +``` diff --git a/processors/output-tcp/tcpoutput.go b/processors/output-tcp/tcpoutput.go new file mode 100644 index 00000000..390180ca --- /dev/null +++ b/processors/output-tcp/tcpoutput.go @@ -0,0 +1,124 @@ +//go:generate bitfanDoc + +package tcpoutput + +import ( + "bufio" + "bytes" + "fmt" + "net" + "time" + + "github.com/vjeantet/bitfan/codecs" + "github.com/vjeantet/bitfan/processors" +) + +func New() processors.Processor { + return &processor{opt: &options{}} +} + +type processor struct { + conn net.Conn + processors.Base + enc codecs.Encoder + opt *options +} + +type options struct { + // The codec used for input data. Input codecs are a convenient method for decoding + // your data before it enters the input, without needing a separate filter in your bitfan pipeline + // @Default "line" + // @Enum "json","line","pp","rubydebug" + // @Type codec + Codec codecs.CodecCollection `mapstructure:"codec"` + + Host string `mapstructure:"host" validate:"required"` + + Port uint `mapstructure:"port" validate:"required"` + + // Turn this on to enable HTTP keepalive support. Default value is true + // @Default true + KeepAlive bool `mapstructure:"keepalive"` + + // Timeout (in seconds) for the entire request. Default value is 60 + // @Default 30 + RequestTimeout uint `mapstructure:"request_timeout"` + + // @Default 10 + RetryInterval uint `mapstructure:"retry_interval"` + + // Add any number of arbitrary tags to your event. There is no default value for this setting. + // This can help with processing later. Tags can be dynamic and include parts of the event using the %{field} syntax. + // Tags []string `mapstructure:"tags"` +} + +func (p *processor) Configure(ctx processors.ProcessorContext, conf map[string]interface{}) error { + defaults := options{ + KeepAlive: true, + RequestTimeout: 30, + RetryInterval: 10, + Codec: codecs.CodecCollection{ + Enc: codecs.New("line", nil, ctx.Log(), ctx.ConfigWorkingLocation()), + }, + } + p.opt = &defaults + + return p.ConfigureAndValidate(ctx, conf, p.opt) +} + +func (p *processor) Receive(e processors.IPacket) error { + if err := p.connect(); err != nil { + time.Sleep(time.Duration(p.opt.RetryInterval) * time.Second) + return err + } + + var body bytes.Buffer + writer := bufio.NewWriter(&body) + enc, err := p.opt.Codec.NewEncoder(writer) + if err != nil { + return fmt.Errorf("Codec failed with: %v", err) + } + if err := enc.Encode(e.Fields().Old()); err != nil { + return fmt.Errorf("Can't encode item with error: %v", err) + } + if err := writer.Flush(); err != nil { + return err + } + p.conn.SetDeadline(time.Now().Add(time.Duration(p.opt.RequestTimeout) * time.Second)) + if _, err := p.conn.Write(body.Bytes()); err != nil { + p.conn.Close() + p.conn = nil + return err + } + return nil +} + +func (p *processor) Start(e processors.IPacket) error { + return p.connect() +} + +func (p *processor) Stop(e processors.IPacket) error { + if p.conn != nil { + return p.conn.Close() + } + return nil +} + +func (p *processor) connect() error { + var ( + addr *net.TCPAddr + err error + ) + if p.conn == nil { + if addr, err = net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", p.opt.Host, p.opt.Port)); err != nil { + return err + } + if p.conn, err = net.DialTCP("tcp", nil, addr); err != nil { + return err + } + p.conn.(*net.TCPConn).SetNoDelay(false) + p.conn.(*net.TCPConn).SetKeepAlive(p.opt.KeepAlive) + p.conn.(*net.TCPConn).SetKeepAlivePeriod(30 * time.Second) + } + return nil +} diff --git a/processors/output-tcp/tcpoutput_test.go b/processors/output-tcp/tcpoutput_test.go new file mode 100644 index 00000000..6520ceb4 --- /dev/null +++ b/processors/output-tcp/tcpoutput_test.go @@ -0,0 +1,86 @@ +package tcpoutput + +import ( + "fmt" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/vjeantet/bitfan/codecs" + "github.com/vjeantet/bitfan/processors/doc" + "github.com/vjeantet/bitfan/processors/testutils" +) + +func TestNew(t *testing.T) { + p := New() + _, ok := p.(*processor) + assert.Equal(t, ok, true, "New() should return a processor struct") +} +func TestDoc(t *testing.T) { + assert.IsType(t, &doc.Processor{}, New().(*processor).Doc()) +} + +func TestLine(t *testing.T) { + server, client := net.Pipe() + srv := &testServer{conn: server} + srv.Start() + p := New().(*processor) + ctx := testutils.NewProcessorContext() + conf := map[string]interface{}{ + "host": "localhost", + "port": 3000, + "codec": codecs.CodecCollection{ + Enc: codecs.New("line", map[string]interface{}{ + "format": "{{.message}}", + }, ctx.Log(), ctx.ConfigWorkingLocation()), + }, + } + p.conn = client + + assert.NoError(t, p.Configure(ctx, conf), "configuration is correct, error should be nil") + assert.NoError(t, p.Start(nil)) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message1", map[string]interface{}{"abc": "def1", "n": 123}))) + assert.NoError(t, p.Receive(testutils.NewPacketOld("message2", map[string]interface{}{"abc": "def2", "n": 456}))) + assert.Equal(t, "message1\n", srv.GetMessage()) + assert.Equal(t, "message2\n", srv.GetMessage()) + assert.NoError(t, p.Stop(nil)) + srv.Stop() +} + +type testServer struct { + conn net.Conn + stringChan chan string +} + +func (t *testServer) Start() { + t.stringChan = make(chan string, 10) + go func() error { + for { + buf := make([]byte, 1024) + n, err := t.conn.Read(buf) + if err != nil { + fmt.Println(err) + return err + } + if n == 0 { + fmt.Println("n=0") + continue + } + t.stringChan <- string(buf[:n]) + } + }() +} + +func (t *testServer) Stop() { + t.conn.Close() + close(t.stringChan) +} + +func (t *testServer) GetMessage() string { + select { + case m := <-t.stringChan: + return m + default: + return "" + } +}