Skip to content

Commit

Permalink
file-plugins: enabling messagepack format (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
tzaffi authored Aug 29, 2023
1 parent a6cd0b8 commit 1ee2e86
Show file tree
Hide file tree
Showing 25 changed files with 704 additions and 246 deletions.
11 changes: 8 additions & 3 deletions conduit/plugins/exporters/filewriter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

Write block data to files. This plugin works with the file rerader plugin to create a simple file-based pipeine.

The genesis file is always exported to a plain JSON file named `genesis.json` regardless of the `FilenamePattern`.

## Configuration

```yml @sample.yaml
name: file_writer
config:
Expand All @@ -13,9 +16,11 @@ config:

# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# To specify JSON encoding, add a '.json' extension to the filename.
# To specify MessagePack encoding, add a '.msgp' extension to the filename.
# If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Expand Down
36 changes: 29 additions & 7 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ import (
const (
// PluginName to use when configuring.
PluginName = "file_writer"

// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
FilePattern = "%[1]d_block.msgp.gz"

// GenesisFilename is the name of the genesis file.
GenesisFilename = "genesis.json"
)

type fileExporter struct {
round uint64
cfg Config
gzip bool
format EncodingFormat
logger *logrus.Logger
}

Expand All @@ -51,20 +57,34 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
// Ignore mkdir if the dir exists
err = nil
} else if err != nil {
if err != nil && !errors.Is(err, os.ErrExist) {
// Ignore mkdir err if the dir exists (case errors.Is(err, os.ErrExist))
return fmt.Errorf("Init() error: %w", err)
}

exp.round = uint64(initProvider.NextDBRound())
return err

genesis := initProvider.GetGenesis()
genesisPath := path.Join(exp.cfg.BlocksDir, GenesisFilename)

// the genesis is always exported as plain JSON:
err = EncodeToFile(genesisPath, genesis, JSONFormat, false)
if err != nil {
return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
}

return nil
}

func (exp *fileExporter) Close() error {
Expand All @@ -87,10 +107,12 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := EncodeJSONToFile(blockFile, exportData, true)

err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}

exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}

Expand Down
113 changes: 71 additions & 42 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand All @@ -21,29 +20,47 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)

const (
defaultEncodingFormat = MessagepackFormat
defaultIsGzip = true
)

var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
var configTemplate = "block-dir: %s/blocks\n"
var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)

func init() {
logger, _ = test.NewNullLogger()
}

func getConfig(t *testing.T) (config, tempdir string) {
func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
config = fmt.Sprintf(configTemplate, tempdir)
config = fmt.Sprintf(configTemplatePrefix, tempdir)
return
}

func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
config, tempdir = getConfigWithoutPattern(t)
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}

func TestDefaults(t *testing.T) {
format, gzip, err := ParseFilenamePattern(FilePattern)
require.NoError(t, err)
require.Equal(t, format, defaultEncodingFormat)
require.Equal(t, gzip, defaultIsGzip)
}

func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
assert.Equal(t, metadata.Name, meta.Name)
assert.Equal(t, metadata.Description, meta.Description)
assert.Equal(t, metadata.Deprecated, meta.Deprecated)
require.Equal(t, metadata.Name, meta.Name)
require.Equal(t, metadata.Description, meta.Description)
require.Equal(t, metadata.Deprecated, meta.Deprecated)
}

func TestExporterInitDefaults(t *testing.T) {
Expand Down Expand Up @@ -87,18 +104,18 @@ func TestExporterInitDefaults(t *testing.T) {
}

func TestExporterInit(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()
}

Expand Down Expand Up @@ -155,55 +172,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}

func TestExporterReceive(t *testing.T) {
config, tempdir := getConfig(t)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
assert.NotContains(t, string(blockBytes), " 0: ")
patterns := []string{
"%[1]d_block.json",
"%[1]d_block.json.gz",
"%[1]d_block.msgp",
"%[1]d_block.msgp.gz",
}
for _, pattern := range patterns {
pattern := pattern
t.Run(pattern, func(t *testing.T) {
t.Parallel()

var blockData data.BlockData
err = DecodeJSONFromFile(path, &blockData, true)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
format, isGzip, err := ParseFilenamePattern(pattern)
require.NoError(t, err)
config, tempdir := getConfigWithPattern(t, pattern)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(pattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
require.NotContains(t, string(blockBytes), " 0: ")

var blockData data.BlockData
err = DecodeFromFile(path, &blockData, format, isGzip)
require.NoError(t, err)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NotNil(t, blockData.Certificate)
}
})
}
}

func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

func TestPatternOverride(t *testing.T) {
config, tempdir := getConfig(t)
func TestPatternDefault(t *testing.T) {
config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()

patternOverride := "PREFIX_%[1]d_block.json"
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)

numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(patternOverride, i)
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)

var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
Expand All @@ -227,10 +256,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)
var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
assert.NoError(t, err)
assert.Nil(t, blockData.Certificate)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
require.Nil(t, blockData.Certificate)
}
}
8 changes: 5 additions & 3 deletions conduit/plugins/exporters/filewriter/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ config:

# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# To specify JSON encoding, add a '.json' extension to the filename.
# To specify MessagePack encoding, add a '.msgp' extension to the filename.
# If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Loading

0 comments on commit 1ee2e86

Please sign in to comment.