Skip to content

Commit

Permalink
Merge pull request onflow#6176 from onflow/fxamacker/support-jsonl-mi…
Browse files Browse the repository at this point in the history
…gratino-report

Add support for JSONL report format as alternative to JSON array report
  • Loading branch information
fxamacker authored Jul 8, 2024
2 parents bfb4f89 + c5bfc47 commit acbcf4a
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/util/cmd/diff-states/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func run(*cobra.Command, []string) {
log.Fatal().Msg("--state-commitment-2 must be provided when --state-2 is provided")
}

rw := reporters.NewReportFileWriterFactory(flagOutputDirectory, log.Logger).
rw := reporters.NewReportFileWriterFactoryWithFormat(flagOutputDirectory, log.Logger, reporters.ReportFormatJSONL).
ReportWriter(ReporterName)
defer rw.Close()

Expand Down
28 changes: 19 additions & 9 deletions cmd/util/cmd/diff-states/diff_states_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package diff_states

import (
"encoding/json"
"io"
"io/fs"
"os"
"path/filepath"
"strings"
"testing"
Expand All @@ -15,7 +17,6 @@ import (
"github.com/onflow/flow-go/ledger"
"github.com/onflow/flow-go/ledger/common/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/io"
"github.com/onflow/flow-go/utils/unittest"
)

Expand Down Expand Up @@ -80,17 +81,26 @@ func TestDiffStates(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, reportPath)

report, err := io.ReadFile(reportPath)
report, err := os.Open(reportPath)
require.NoError(t, err)

var msgs []any
err = json.Unmarshal(report, &msgs)
require.NoError(t, err)
var msgs [][]byte
decoder := json.NewDecoder(report)
for {
var msg json.RawMessage
err = decoder.Decode(&msg)
if err == io.EOF {
break
}
require.NoError(t, err)

msgs = append(msgs, msg)
}

assert.Equal(t, 4, len(msgs))
assert.Containsf(t, string(report), `{"kind":"raw-diff","owner":"0100000000000000","key":"62","value1":"03","value2":"05"}`, "diff report contains raw-diff")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0200000000000000","state":2}`, "diff report contains account-missing for 0200000000000000")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0300000000000000","state":1}`, "diff report contains account-missing for 0300000000000000")
assert.Containsf(t, string(report), `{"kind":"account-missing","owner":"0400000000000000","state":1}`, "diff report contains account-missing for 0400000000000000")
assert.Containsf(t, msgs, []byte(`{"kind":"account-missing","owner":"0200000000000000","state":2}`), "diff report contains account-missing for 0200000000000000")
assert.Containsf(t, msgs, []byte(`{"kind":"account-missing","owner":"0300000000000000","state":1}`), "diff report contains account-missing for 0300000000000000")
assert.Containsf(t, msgs, []byte(`{"kind":"account-missing","owner":"0400000000000000","state":1}`), "diff report contains account-missing for 0400000000000000")
assert.Containsf(t, msgs, []byte(`{"kind":"raw-diff","owner":"0100000000000000","key":"62","value1":"03","value2":"05"}`), "diff report contains raw-diff")
})
}
102 changes: 74 additions & 28 deletions cmd/util/ledger/reporters/reporter_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,51 @@ type ReportFileWriterFactory struct {
fileSuffix int32
outputDir string
log zerolog.Logger
format ReportFormat
}

type ReportFormat uint8

const (
// ReportFormatJSONArray represents format encoded as JSON array at the top level.
ReportFormatJSONArray ReportFormat = iota

// ReportFormatJSONL represents format encoded as JSONL.
// ReportFormatJSONL should be used when report might be large enough to
// crash tools like jq (if JSON array is used instead of JSONL).
ReportFormatJSONL
)

func NewReportFileWriterFactory(outputDir string, log zerolog.Logger) *ReportFileWriterFactory {
return NewReportFileWriterFactoryWithFormat(outputDir, log, ReportFormatJSONArray)
}

func NewReportFileWriterFactoryWithFormat(outputDir string, log zerolog.Logger, format ReportFormat) *ReportFileWriterFactory {
return &ReportFileWriterFactory{
fileSuffix: int32(time.Now().Unix()),
outputDir: outputDir,
log: log,
format: format,
}
}

func (r *ReportFileWriterFactory) Filename(dataNamespace string) string {
return path.Join(r.outputDir, fmt.Sprintf("%s_%d.json", dataNamespace, r.fileSuffix))
switch r.format {
case ReportFormatJSONArray:
return path.Join(r.outputDir, fmt.Sprintf("%s_%d.json", dataNamespace, r.fileSuffix))

case ReportFormatJSONL:
return path.Join(r.outputDir, fmt.Sprintf("%s_%d.jsonl", dataNamespace, r.fileSuffix))

default:
panic(fmt.Sprintf("unrecognized report format: %d", r.format))
}
}

func (r *ReportFileWriterFactory) ReportWriter(dataNamespace string) ReportWriter {
fn := r.Filename(dataNamespace)

return NewReportFileWriter(fn, r.log)
return NewReportFileWriter(fn, r.log, r.format)
}

var _ ReportWriterFactory = &ReportFileWriterFactory{}
Expand Down Expand Up @@ -70,13 +97,14 @@ type ReportFileWriter struct {
writeChan chan interface{}
writer *bufio.Writer
log zerolog.Logger
format ReportFormat
faulty bool
firstWrite bool
}

const reportFileWriteBufferSize = 100

func NewReportFileWriter(fileName string, log zerolog.Logger) ReportWriter {
func NewReportFileWriter(fileName string, log zerolog.Logger, format ReportFormat) ReportWriter {
f, err := os.Create(fileName)
if err != nil {
log.Warn().Err(err).Msg("Error creating ReportFileWriter, defaulting to ReportNilWriter")
Expand All @@ -85,24 +113,26 @@ func NewReportFileWriter(fileName string, log zerolog.Logger) ReportWriter {

writer := bufio.NewWriter(f)

// open a json array
_, err = writer.WriteRune('[')
if format == ReportFormatJSONArray {
// Open top-level JSON array
_, err = writer.WriteRune('[')

if err != nil {
log.Warn().Err(err).Msg("Error opening json array")
// time to clean up
err = writer.Flush()
if err != nil {
log.Error().Err(err).Msg("Error closing flushing writer")
panic(err)
log.Warn().Err(err).Msg("Error opening json array")
// time to clean up
err = writer.Flush()
if err != nil {
log.Error().Err(err).Msg("Error closing flushing writer")
panic(err)
}

err = f.Close()
if err != nil {
log.Error().Err(err).Msg("Error closing report file")
panic(err)
}
return ReportNilWriter{}
}

err = f.Close()
if err != nil {
log.Error().Err(err).Msg("Error closing report file")
panic(err)
}
return ReportNilWriter{}
}

fw := &ReportFileWriter{
Expand All @@ -113,6 +143,7 @@ func NewReportFileWriter(fileName string, log zerolog.Logger) ReportWriter {
firstWrite: true,
writeChan: make(chan interface{}, reportFileWriteBufferSize),
wg: &sync.WaitGroup{},
format: format,
}

fw.wg.Add(1)
Expand Down Expand Up @@ -141,12 +172,23 @@ func (r *ReportFileWriter) write(dataPoint interface{}) {
r.faulty = true
}

// delimit the json records with commas
if !r.firstWrite {
_, err = r.writer.WriteRune(',')
if err != nil {
r.log.Warn().Err(err).Msg("Error Writing json to file")
r.faulty = true
switch r.format {
case ReportFormatJSONArray:
// delimit the json records with commas
_, err = r.writer.WriteRune(',')
if err != nil {
r.log.Warn().Err(err).Msg("Error writing JSON array delimiter to file")
r.faulty = true
}

case ReportFormatJSONL:
// delimit the json records with line break
_, err = r.writer.WriteRune('\n')
if err != nil {
r.log.Warn().Err(err).Msg("Error Writing JSONL delimiter to file")
r.faulty = true
}
}
} else {
r.firstWrite = false
Expand All @@ -163,12 +205,16 @@ func (r *ReportFileWriter) Close() {
close(r.writeChan)
r.wg.Wait()

_, err := r.writer.WriteRune(']')
if err != nil {
r.log.Warn().Err(err).Msg("Error finishing json array")
// nothing to do, we will be closing the file now
if r.format == ReportFormatJSONArray {
// Close top-level json array
_, err := r.writer.WriteRune(']')
if err != nil {
r.log.Warn().Err(err).Msg("Error finishing json array")
// nothing to do, we will be closing the file now
}
}
err = r.writer.Flush()

err := r.writer.Flush()
if err != nil {
r.log.Error().Err(err).Msg("Error closing flushing writer")
panic(err)
Expand Down
78 changes: 73 additions & 5 deletions cmd/util/ledger/reporters/reporter_output_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package reporters_test

import (
"fmt"
"os"
"path"
"sync"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/onflow/flow-go/cmd/util/ledger/reporters"
)

func TestReportFileWriter(t *testing.T) {
func TestReportFileWriterJSONArray(t *testing.T) {
dir := t.TempDir()

filename := path.Join(dir, "test.json")
Expand All @@ -30,20 +31,20 @@ func TestReportFileWriter(t *testing.T) {
}

t.Run("Open & Close - empty json array", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log)
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONArray)
rw.Close()

requireFileContains(t, "[]")
})
t.Run("Open & Write One & Close - json array with one element", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log)
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONArray)
rw.Write(testData{TestField: "something"})
rw.Close()

requireFileContains(t, "[{\"TestField\":\"something\"}]")
})
t.Run("Open & Write Many & Close - json array with many elements", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log)
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONArray)
rw.Write(testData{TestField: "something0"})
rw.Write(testData{TestField: "something1"})
rw.Write(testData{TestField: "something2"})
Expand All @@ -55,7 +56,7 @@ func TestReportFileWriter(t *testing.T) {
})

t.Run("Open & Write Many in threads & Close", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log)
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONArray)

wg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
Expand All @@ -74,3 +75,70 @@ func TestReportFileWriter(t *testing.T) {
"[{\"TestField\":\"something\"},{\"TestField\":\"something\"},{\"TestField\":\"something\"}]")
})
}

func TestReportFileWriterJSONL(t *testing.T) {
dir := t.TempDir()

filename := path.Join(dir, "test.jsonl")
log := zerolog.Logger{}

requireFileContains := func(t *testing.T, expected string) {
dat, err := os.ReadFile(filename)
require.NoError(t, err)

fmt.Printf("filename: %s\n", filename)

require.Equal(t, []byte(expected), dat)
}

type testData struct {
TestField string
}

t.Run("Open & Close", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONL)
rw.Close()

requireFileContains(t, "")
})

t.Run("Open & Write One & Close", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONL)
rw.Write(testData{TestField: "something"})
rw.Close()

requireFileContains(t, "{\"TestField\":\"something\"}")
})

t.Run("Open & Write Many & Close", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONL)
rw.Write(testData{TestField: "something0"})
rw.Write(testData{TestField: "something1"})
rw.Write(testData{TestField: "something2"})

rw.Close()

requireFileContains(t,
"{\"TestField\":\"something0\"}\n{\"TestField\":\"something1\"}\n{\"TestField\":\"something2\"}")
})

t.Run("Open & Write Many in threads & Close", func(t *testing.T) {
rw := reporters.NewReportFileWriter(filename, log, reporters.ReportFormatJSONL)

wg := &sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
rw.Write(testData{TestField: "something"})
wg.Done()
}()
}

wg.Wait()

rw.Close()

requireFileContains(t,
"{\"TestField\":\"something\"}\n{\"TestField\":\"something\"}\n{\"TestField\":\"something\"}")
})
}

0 comments on commit acbcf4a

Please sign in to comment.