Skip to content

Commit

Permalink
Add output splitting by tag and key fields (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiping-s authored Jan 17, 2022
1 parent e99106a commit 9564c29
Show file tree
Hide file tree
Showing 20 changed files with 1,178 additions and 396 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ Dump contents of Fluentd Forward messages (Forward, PackedForward, CompressedPac
fluentlibtool dump [filepath]...
```

Run a fake Fluentd server to print all logs in JSON to file or stdout (pass "-" as filename)
Run a fake Fluentd server to print logs in JSON to stdout or one file per each tag + key fields

```bash
fluentlibtool server -f 0 -x 0 -n 0 output.json
fluentlibtool server -f 0 -x 0 -n 0.5
```

```bash
fluentlibtool server --split_output_keys=environment/class,level --split_output_path=/tmp/%s.json --split_strict_mode=true
```

(`-f`, `-x`, and `-n` are to simulate network errors etc, use `fluentlibtool help server` to get help)
Expand Down
2 changes: 1 addition & 1 deletion cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
func init() {
config.AddParentCmdWithArgs("", "Tools for Fluentd / Fluent Bit", nil, nil, nil)
config.AddCmdWithArgs("dump <path-to-files-or-dirs>...", "Dump given files or dirs. Support Fluent Bit chunk files (.flb) and Fluentd Forward messages in msgpack format", &dumpCmd, dumpCmd.Run)
config.AddCmdWithArgs("server <output_file>", "Run a test server for Fluentd Forward Protocol and output logs in JSON.", &serverCmd, serverCmd.Run)
config.AddCmdWithArgs("server", "Run a test server for Fluentd Forward Protocol and output logs in JSON.", &serverCmd, serverCmd.Run)
}

// Execute parses command-line and executes the root command
Expand Down
23 changes: 22 additions & 1 deletion cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"syscall"

"github.com/relex/fluentlib/server"
"github.com/relex/fluentlib/server/receivers"
"github.com/relex/gotils/logger"
)

Expand All @@ -18,6 +19,9 @@ var serverCmd = serverCmdState{
Address: "localhost:24224",
Secret: "guess",
TLS: true,
SplitOutputKeys: []string{"app", "level", "pnum"},
SplitOutputPath: "",
SplitStrictMode: false,
RandomNoHandshake: 0.0,
RandomFailAuth: 0.0,
RandomNoReceiving: 0.0,
Expand All @@ -27,7 +31,23 @@ var serverCmd = serverCmdState{
}

func (cmd *serverCmdState) Run(args []string) {
srv, _ := server.LaunchServer(logger.Root(), cmd.Config, server.NewMessageWriter(os.Stdout))
var receiver receivers.Receiver
if len(serverCmd.SplitOutputPath) > 0 {
if err := receivers.VerifySplittingFilePath(serverCmd.SplitOutputPath); err != nil {
logger.Fatal("invalid split_output_path: ", err.Error())
}

logger.WithFields(logger.Fields{
"keys": serverCmd.SplitOutputKeys,
"path": serverCmd.SplitOutputPath,
"strict": serverCmd.SplitStrictMode,
}).Infof("use split output")
receiver = receivers.NewSplittingFileWriter(serverCmd.SplitOutputKeys, serverCmd.SplitOutputPath, serverCmd.SplitStrictMode)
} else {
logger.Infof("use message output")
receiver = receivers.NewMessageWriter(os.Stdout)
}
srv, _ := server.LaunchServer(logger.Root(), cmd.Config, receiver)

sigChan := make(chan os.Signal, 10)
signal.Notify(sigChan, syscall.SIGINT)
Expand All @@ -38,4 +58,5 @@ func (cmd *serverCmdState) Run(args []string) {

srv.Shutdown()
logger.Info("server stopped")
logger.Exit(0)
}
7 changes: 4 additions & 3 deletions dump/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func PrintChunkFileInJSON(path string, indented bool, writer io.Writer) error {
lastI := -1
iterError := fluentbitchunk.IterateRecords(flbPayload, func(event forwardprotocol.EventEntry) error {
lastI++
return printEventInJSON(event, flbTag, indented, writer, lastI == 0)
return PrintEventInJSON(event, flbTag, indented, writer, lastI == 0)
})
if iterError != nil {
return fmt.Errorf("corrupted fluent-bit chunk file %s on the %dth record: %w", path, lastI, iterError)
Expand All @@ -52,7 +52,7 @@ func PrintChunkFileInJSON(path string, indented bool, writer io.Writer) error {
// PrintMessageInJSON dumps all logs in the given message in JSON format. Each log (event) is followed by a newline.
func PrintMessageInJSON(message forwardprotocol.Message, indented bool, writer io.Writer) error {
for i, event := range message.Entries {
if err := printEventInJSON(event, message.Tag, indented, writer, i == 0); err != nil {
if err := PrintEventInJSON(event, message.Tag, indented, writer, i == 0); err != nil {
_, _ = writer.Write([]byte("\n]\n")) // ignore error
return err
}
Expand All @@ -63,7 +63,8 @@ func PrintMessageInJSON(message forwardprotocol.Message, indented bool, writer i
return nil
}

func printEventInJSON(event forwardprotocol.EventEntry, tag string, indented bool, writer io.Writer, isFirst bool) error {
// PrintEventInJSON dump a single record in JSON format
func PrintEventInJSON(event forwardprotocol.EventEntry, tag string, indented bool, writer io.Writer, isFirst bool) error {
if isFirst {
if _, werr := writer.Write([]byte("[\n")); werr != nil {
return fmt.Errorf("failed to print leading bracket: %w", werr)
Expand Down
17 changes: 9 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ module github.com/relex/fluentlib
go 1.16

require (
github.com/prometheus/common v0.23.0 // indirect
github.com/relex/gotils v0.0.0-20210507093346-88c86b7c95a8
github.com/spf13/cobra v1.1.3
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/relex/gotils v0.0.0-20220113070232-66dfd9701ae6
github.com/spf13/afero v1.8.0 // indirect
github.com/spf13/cobra v1.3.0 // indirect
github.com/spf13/viper v1.10.1 // indirect
github.com/stretchr/testify v1.7.0
github.com/vmihailenco/msgpack/v4 v4.3.12
github.com/vmihailenco/tagparser v0.1.2 // indirect
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect
golang.org/x/net v0.0.0-20210505214959-0714010a04ed // indirect
golang.org/x/sys v0.0.0-20210507014357-30e306a8bba5 // indirect
golang.org/x/term v0.0.0-20210503060354-a79de5458b56 // indirect
google.golang.org/appengine v1.6.7 // indirect
golang.org/x/net v0.0.0-20220114011407-0dd24b26b47d // indirect
golang.org/x/sys v0.0.0-20220111092808-5a964db01320 // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
)
Loading

0 comments on commit 9564c29

Please sign in to comment.