diff --git a/entrypoint/parser/pipelinebuilder.go b/entrypoint/parser/pipelinebuilder.go index a5e29bfd..7675ce3c 100644 --- a/entrypoint/parser/pipelinebuilder.go +++ b/entrypoint/parser/pipelinebuilder.go @@ -5,6 +5,7 @@ import ( "fmt" "strconv" + "github.com/k0kubun/pp" "github.com/vjeantet/bitfan/core" "github.com/vjeantet/bitfan/entrypoint/parser/logstash" ) @@ -52,7 +53,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]core.Age for pluginIndex := 0; pluginIndex < len(LSConfiguration.Sections["input"].Plugins); pluginIndex++ { plugin := LSConfiguration.Sections["input"].Plugins[pluginIndex] - agents, tmpOutPorts, err := buildInputAgents(plugin, pwd) + agents, tmpOutPorts, err := buildInputAgents(plugin, nil, pwd) if err != nil { return nil, err } @@ -83,7 +84,7 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]core.Age var agents []core.Agent i++ plugin := LSConfiguration.Sections["output"].Plugins[pluginIndex] - agents, err = buildOutputAgents(plugin, outPorts, pwd) + agents, _, err = buildOutputAgents(plugin, outPorts, pwd) if err != nil { return nil, err } @@ -96,44 +97,8 @@ func buildAgents(content []byte, pwd string, pickSections ...string) ([]core.Age } // TODO : this should return ports to be able to use multiple path use -func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]core.Agent, []core.Port, error) { - - var agent core.Agent - agent = core.NewAgent() - agent.Type = "input_" + plugin.Name - if plugin.Label == "" { - agent.Label = plugin.Name - } else { - agent.Label = plugin.Label - } - agent.Buffer = 20 - agent.PoolSize = 1 - agent.Wd = pwd - - // Plugin configuration - agent.Options = map[string]interface{}{} - for _, setting := range plugin.Settings { - agent.Options[setting.K] = setting.V - } - - // handle codec - if len(plugin.Codecs) > 0 { - codecs := map[int]interface{}{} - for i, codec := range plugin.Codecs { - if codec.Name != "" { - pcodec := core.NewCodec(codec.Name) - for _, setting := range codec.Settings { - pcodec.Options[setting.K] = setting.V - if setting.K == "role" { - pcodec.Role = setting.V.(string) - } - } - - codecs[i] = pcodec - } - } - agent.Options["codecs"] = codecs - } +func buildInputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, []core.Port, error) { + agent := newAgent(plugin, pwd, "input_") // If agent is a "use" // build imported pipeline from path @@ -186,83 +151,19 @@ func buildInputAgents(plugin *logstash.Plugin, pwd string) ([]core.Agent, []core } // interval can be a number, a string number or a cron string pattern - interval := agent.Options["interval"] - switch t := interval.(type) { - case int, int8, int16, int32, int64: - agent.Schedule = fmt.Sprintf("@every %ds", t) - case string: - if i, err := strconv.Atoi(t); err == nil { - agent.Schedule = fmt.Sprintf("@every %ds", i) - } else { - agent.Schedule = t - } - } + setAgentInterval(&agent) // @see commit dbeb4015a88893bffd6334d38f34f978312eff82 - if trace, ok := agent.Options["trace"]; ok { - switch t := trace.(type) { - case string: - agent.Trace = true - case bool: - agent.Trace = t - } - } + setAgentTrace(&agent) - if workers, ok := agent.Options["workers"]; ok { - switch t := workers.(type) { - case int64: - agent.PoolSize = int(t) - case int32: - agent.PoolSize = int(t) - case string: - if i, err := strconv.Atoi(t); err == nil { - agent.PoolSize = i - } - } - } + setAgentPoolSize(&agent) outPort := core.Port{AgentID: agent.ID, PortNumber: 0} return []core.Agent{agent}, []core.Port{outPort}, nil } -func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, error) { - agent_list := []core.Agent{} - - var agent core.Agent - agent = core.NewAgent() - agent.Type = "output_" + plugin.Name - if plugin.Label == "" { - agent.Label = plugin.Name - } else { - agent.Label = plugin.Label - } - agent.Buffer = 20 - agent.PoolSize = 1 - agent.Wd = pwd - - // Plugin configuration - agent.Options = map[string]interface{}{} - for _, setting := range plugin.Settings { - agent.Options[setting.K] = setting.V - } - - // handle codec - if len(plugin.Codecs) > 0 { - codecs := map[int]interface{}{} - for i, codec := range plugin.Codecs { - if codec.Name != "" { - pcodec := core.NewCodec(codec.Name) - for _, setting := range codec.Settings { - pcodec.Options[setting.K] = setting.V - if setting.K == "role" { - pcodec.Role = setting.V.(string) - } - } - codecs[i] = pcodec - } - } - agent.Options["codecs"] = codecs - } +func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, []core.Port, error) { + agent := newAgent(plugin, pwd, "output_") // if its a use plugin // load filter and output parts of pipeline @@ -280,7 +181,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st agent.Options["path"] = []string{v.(string)} fileConfigAgents, err := parseConfigLocation(v.(string), agent.Options, pwd, "filter", "output") if err != nil { - return nil, err + return nil, nil, err } firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] @@ -290,14 +191,14 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st } //specific to output - return fileConfigAgents, nil + return fileConfigAgents, nil, nil case []interface{}: CombinedFileConfigAgents := []core.Agent{} for _, p := range v.([]interface{}) { fileConfigAgents, err := parseConfigLocation(p.(string), agent.Options, pwd, "filter", "output") if err != nil { - return nil, err + return nil, nil, err } firstUsedAgent := &fileConfigAgents[len(fileConfigAgents)-1] @@ -308,7 +209,7 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st CombinedFileConfigAgents = append(CombinedFileConfigAgents, fileConfigAgents...) } // return pipeline a b c ... with theirs respectives outputs - return CombinedFileConfigAgents, nil + return CombinedFileConfigAgents, nil, nil } } } @@ -320,99 +221,26 @@ func buildOutputAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st agent.AgentSources = append(agent.AgentSources, inPort) } - // if plugin.Codec != nil { - // agent.Options["codec"] = plugin.Codec.Name - // } - + agent_list := []core.Agent{} // Is this Plugin has conditional expressions ? if len(plugin.When) > 0 { - // outPorts_when := []port{} - // le plugin WHEn est $plugin - agent.Options["expressions"] = map[int]string{} - // Loop over expressions in correct order - for expressionIndex := 0; expressionIndex < len(plugin.When); expressionIndex++ { - when := plugin.When[expressionIndex] - // enregistrer l'expression dans la conf agent - agent.Options["expressions"].(map[int]string)[expressionIndex] = when.Expression - - // recupérer le outport associé (expressionIndex) - expressionOutPorts := []core.Port{ - {AgentID: agent.ID, PortNumber: expressionIndex}, - } - - // construire les plugins associés à l'expression - // en utilisant le expressionOutPorts - for pi := 0; pi < len(when.Plugins); pi++ { - p := when.Plugins[pi] - var agents []core.Agent - var err error - // récupérer le dernier outport du plugin créé il devient expressionOutPorts - agents, err = buildOutputAgents(p, expressionOutPorts, pwd) - if err != nil { - return nil, err - } - - // ajoute l'agent à la liste des agents - agent_list = append(agents, agent_list...) - } + var err error + if agent_list, _, err = buildWhenBranch(&agent, plugin.When, "output"); err != nil { + return nil, nil, err } } // @see commit dbeb4015a88893bffd6334d38f34f978312eff82 - if trace, ok := agent.Options["trace"]; ok { - switch t := trace.(type) { - case string: - agent.Trace = true - case bool: - agent.Trace = t - } - } + setAgentTrace(&agent) // ajoute l'agent à la liste des agents agent_list = append([]core.Agent{agent}, agent_list...) - return agent_list, nil + return agent_list, nil, nil } func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd string) ([]core.Agent, []core.Port, error) { - - agent_list := []core.Agent{} - - var agent core.Agent - agent = core.NewAgent() - agent.Type = plugin.Name - if plugin.Label == "" { - agent.Label = plugin.Name - } else { - agent.Label = plugin.Label - } - - agent.Buffer = 20 + agent := newAgent(plugin, pwd, "") agent.PoolSize = 2 - agent.Wd = pwd - - // Plugin configuration - agent.Options = map[string]interface{}{} - for _, setting := range plugin.Settings { - agent.Options[setting.K] = setting.V - } - - // handle codec - if len(plugin.Codecs) > 0 { - codecs := map[int]interface{}{} - for i, codec := range plugin.Codecs { - if codec.Name != "" { - pcodec := core.NewCodec(codec.Name) - for _, setting := range codec.Settings { - pcodec.Options[setting.K] = setting.V - if setting.K == "role" { - pcodec.Role = setting.V.(string) - } - } - codecs[i] = pcodec - } - } - agent.Options["codecs"] = codecs - } // handle use plugin // If its a use agent @@ -498,6 +326,92 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st } // interval can be a number, a string number or a cron string pattern + setAgentInterval(&agent) + + // @see commit dbeb4015a88893bffd6334d38f34f978312eff82 + setAgentTrace(&agent) + + setAgentPoolSize(&agent) + + // Plugin Sources + agent.AgentSources = core.PortList{} + for _, sourceport := range lastOutPorts { + inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} + agent.AgentSources = append(agent.AgentSources, inPort) + } + + // By Default Agents output to port 0 + newOutPorts := []core.Port{ + {AgentID: agent.ID, PortNumber: 0}, + } + + agent_list := []core.Agent{} + + // Is this Plugin has conditional expressions ? + if len(plugin.When) > 0 { + var err error + if agent_list, newOutPorts, err = buildWhenBranch(&agent, plugin.When, "filter"); err != nil { + return nil, nil, err + } + } + + // ajoute l'agent à la liste des agents + agent_list = append([]core.Agent{agent}, agent_list...) + return agent_list, newOutPorts, nil +} + +func isInSlice(needle string, candidates []string) bool { + for _, symbolType := range candidates { + if needle == symbolType { + return true + } + } + return false +} + +func newAgent(plugin *logstash.Plugin, pwd string, labelPrefix string) core.Agent { + var agent = core.Agent{} + agent = core.NewAgent() + agent.Type = labelPrefix + plugin.Name + if plugin.Label == "" { + agent.Label = plugin.Name + } else { + agent.Label = plugin.Label + } + + agent.Buffer = 20 + agent.PoolSize = 1 + agent.Wd = pwd + + // Plugin configuration + agent.Options = map[string]interface{}{} + for _, setting := range plugin.Settings { + agent.Options[setting.K] = setting.V + } + + // handle codecs + if len(plugin.Codecs) > 0 { + codecs := map[int]interface{}{} + for i, codec := range plugin.Codecs { + if codec.Name != "" { + pcodec := core.NewCodec(codec.Name) + for _, setting := range codec.Settings { + pcodec.Options[setting.K] = setting.V + if setting.K == "role" { + pcodec.Role = setting.V.(string) + } + } + + codecs[i] = pcodec + } + } + agent.Options["codecs"] = codecs + } + + return agent +} + +func setAgentInterval(agent *core.Agent) { interval := agent.Options["interval"] switch t := interval.(type) { case int, int8, int16, int32, int64: @@ -509,8 +423,8 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st agent.Schedule = t } } - - // @see commit dbeb4015a88893bffd6334d38f34f978312eff82 +} +func setAgentTrace(agent *core.Agent) { if trace, ok := agent.Options["trace"]; ok { switch t := trace.(type) { case string: @@ -519,7 +433,8 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st agent.Trace = t } } - +} +func setAgentPoolSize(agent *core.Agent) { if workers, ok := agent.Options["workers"]; ok { switch t := workers.(type) { case int64: @@ -532,78 +447,61 @@ func buildFilterAgents(plugin *logstash.Plugin, lastOutPorts []core.Port, pwd st } } } + return +} - // Plugin Sources - agent.AgentSources = core.PortList{} - for _, sourceport := range lastOutPorts { - inPort := core.Port{AgentID: sourceport.AgentID, PortNumber: sourceport.PortNumber} - agent.AgentSources = append(agent.AgentSources, inPort) - } - - // By Default Agents output to port 0 - newOutPorts := []core.Port{ - {AgentID: agent.ID, PortNumber: 0}, - } +func buildWhenBranch(agent *core.Agent, Whens map[int]*logstash.When, sectionType string) ([]core.Agent, []core.Port, error) { + agent_list := []core.Agent{} + outPorts_when := []core.Port{} + // le plugin WHEn est $plugin + agent.Options["expressions"] = map[int]string{} + elseOK := false + // Loop over expressions in correct order + for expressionIndex := 0; expressionIndex < len(Whens); expressionIndex++ { + when := Whens[expressionIndex] + // enregistrer l'expression dans la conf agent + agent.Options["expressions"].(map[int]string)[expressionIndex] = when.Expression + if when.Expression == "true" { + elseOK = true + } + // recupérer le outport associé (expressionIndex) + expressionOutPorts := []core.Port{ + {AgentID: agent.ID, PortNumber: expressionIndex}, + } - // Is this Plugin has conditional expressions ? - if len(plugin.When) > 0 { - outPorts_when := []core.Port{} - // le plugin WHEn est $plugin - agent.Options["expressions"] = map[int]string{} - elseOK := false - // Loop over expressions in correct order - for expressionIndex := 0; expressionIndex < len(plugin.When); expressionIndex++ { - when := plugin.When[expressionIndex] - // enregistrer l'expression dans la conf agent - agent.Options["expressions"].(map[int]string)[expressionIndex] = when.Expression - if when.Expression == "true" { - elseOK = true - } - // recupérer le outport associé (expressionIndex) - expressionOutPorts := []core.Port{ - {AgentID: agent.ID, PortNumber: expressionIndex}, + // construire les plugins associés à l'expression + // en utilisant le outportA + for pi := 0; pi < len(when.Plugins); pi++ { + p := when.Plugins[pi] + var agents []core.Agent + var err error + // récupérer le dernier outport du plugin créé il devient outportA + if sectionType == "filter" { + agents, expressionOutPorts, err = buildFilterAgents(p, expressionOutPorts, agent.Wd) + } else if sectionType == "output" { + agents, _, err = buildOutputAgents(p, expressionOutPorts, agent.Wd) + pp.Println("agents-->", agents) } - // construire les plugins associés à l'expression - // en utilisant le outportA - for pi := 0; pi < len(when.Plugins); pi++ { - p := when.Plugins[pi] - var agents []core.Agent - var err error - // récupérer le dernier outport du plugin créé il devient outportA - agents, expressionOutPorts, err = buildFilterAgents(p, expressionOutPorts, pwd) - if err != nil { - return nil, nil, err - } - - // ajoute l'agent à la liste des agents - agent_list = append(agents, agent_list...) + if err != nil { + return nil, nil, err } - // ajouter le dernier outportA de l'expression au outport final du when - outPorts_when = append(expressionOutPorts, outPorts_when...) - } - newOutPorts = outPorts_when - // If no else expression was found, insert one - if elseOK == false { - agent.Options["expressions"].(map[int]string)[len(agent.Options["expressions"].(map[int]string))] = "true" - elseOutPorts := []core.Port{ - {AgentID: agent.ID, PortNumber: len(agent.Options["expressions"].(map[int]string)) - 1}, - } - newOutPorts = append(elseOutPorts, newOutPorts...) + // ajoute l'agent à la liste des agents + agent_list = append(agents, agent_list...) } + // ajouter le dernier outportA de l'expression au outport final du when + outPorts_when = append(expressionOutPorts, outPorts_when...) } - // ajoute l'agent à la liste des agents - agent_list = append([]core.Agent{agent}, agent_list...) - return agent_list, newOutPorts, nil -} - -func isInSlice(needle string, candidates []string) bool { - for _, symbolType := range candidates { - if needle == symbolType { - return true + // If no else expression was found, insert one + if elseOK == false { + agent.Options["expressions"].(map[int]string)[len(agent.Options["expressions"].(map[int]string))] = "true" + elseOutPorts := []core.Port{ + {AgentID: agent.ID, PortNumber: len(agent.Options["expressions"].(map[int]string)) - 1}, } + outPorts_when = append(elseOutPorts, outPorts_when...) } - return false + + return agent_list, outPorts_when, nil } diff --git a/examples.d/if.conf b/examples.d/if.conf index 7caba5b9..9acfad2c 100644 --- a/examples.d/if.conf +++ b/examples.d/if.conf @@ -2,7 +2,7 @@ input { stdout{ codec=>line { - format => "Hello ${USER}, type 'test' to output to rubydebug" + format => "Hello ${USER}, type 'test1', 'test2','test3' or anything to test if" delimiter => " : " } } @@ -10,14 +10,31 @@ input { } } +filter{ + if [message] == "test1" { + mutate { add_field => {"A" => "cas 1"} } + mutate { uppercase => ["A"] } + mutate { add_field => {"AA" => "%{A}++"} } + }else if [message] == "test2" { + mutate { add_field => {"A" => "cas 2"} } + mutate { uppercase => ["A"] } + }else{ + mutate { add_field => {"A" => "cas 3"} } + mutate { uppercase => ["A"] } + } +} + output { if [message] == "test" { stdout{ codec => rubydebug } + stdout{ + codec => json + } }else{ stdout{ - codec => line + codec => rubydebug } } } \ No newline at end of file diff --git a/go.list-duplcode.sh b/go.list-duplcode.sh new file mode 100755 index 00000000..c0de7fce --- /dev/null +++ b/go.list-duplcode.sh @@ -0,0 +1 @@ +find . -name "*docdoc.go" -type f -delete ;dupl -html -t 50 $(find . -path ./vendor -prune -o -name *.go) > dup.html \ No newline at end of file