Skip to content

Commit

Permalink
msgp in file plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
Zeph Grunschlag committed Aug 18, 2023
1 parent 1816271 commit f2bc1a4
Show file tree
Hide file tree
Showing 25 changed files with 761 additions and 256 deletions.
32 changes: 12 additions & 20 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"fmt"
"net/http"
"os"
"path"
"runtime/pprof"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

Expand Down Expand Up @@ -103,29 +101,23 @@ func (p *pipelineImpl) registerPluginMetricsCallbacks() {
}
}

// makeConfig creates a plugin config from a name and config pair.
// configWithLogger creates a plugin config from a name and config pair.
// It also creates a logger for the plugin and configures it using the pipeline's log settings.
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
func (p *pipelineImpl) configWithLogger(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
var dataDir string
if p.cfg.ConduitArgs != nil {
dataDir = p.cfg.ConduitArgs.ConduitDataDir
}
config, err := pluginType.GetConfig(cfg, dataDir)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
return nil, plugins.PluginConfig{}, fmt.Errorf("configWithLogger(): unable to create plugin config: %w", err)
}

lgr := log.New()
lgr.SetOutput(p.logger.Out)
lgr.SetLevel(p.logger.Level)
lgr.SetFormatter(makePluginLogFormatter(string(pluginType), cfg.Name))

var config plugins.PluginConfig
config.Config = string(configs)
if p.cfg != nil && p.cfg.ConduitArgs != nil {
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
}
}

return lgr, config, nil
}

Expand Down Expand Up @@ -171,7 +163,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
var pluginOverride uint64
var pluginOverrideName string // cache this in case of error.
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
_, config, err := p.configWithLogger(part.cfg, part.t)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -302,7 +294,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Importer
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
importerLogger, pluginConfig, err := p.configWithLogger(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
Expand Down Expand Up @@ -335,7 +327,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Processors
for idx, processor := range p.processors {
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
logger, config, err := p.configWithLogger(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
Expand All @@ -348,7 +340,7 @@ func (p *pipelineImpl) Init() error {

// Initialize Exporter
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
logger, config, err := p.configWithLogger(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
Expand Down
38 changes: 35 additions & 3 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,24 @@ import (
const (
// PluginName to use when configuring.
PluginName = "file_writer"

// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
FilePattern = "%[1]d_block.msgp.gz"
)

type EncodingFormat byte

const (
MessagepackFormat EncodingFormat = iota
JSONFormat
UnrecognizedFormat
)

type fileExporter struct {
round uint64
cfg Config
gzip bool
format EncodingFormat
logger *logrus.Logger
}

Expand All @@ -51,6 +62,11 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
Expand All @@ -64,7 +80,21 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
return fmt.Errorf("Init() error: %w", err)
}
exp.round = uint64(initProvider.NextDBRound())
return err

// export the genesis as well in the same format
genesis := initProvider.GetGenesis()
genesisFile, err := GenesisFilename(exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

genesisPath := path.Join(exp.cfg.BlocksDir, genesisFile)
err = EncodeToFile(genesisPath, genesis, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
}

return nil
}

func (exp *fileExporter) Close() error {
Expand All @@ -87,10 +117,12 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := EncodeJSONToFile(blockFile, exportData, true)

err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}

exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}

Expand Down
111 changes: 69 additions & 42 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand All @@ -21,29 +20,45 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)

const (
defaultEncodingFormat = MessagepackFormat
defaultIsGzip = true
)

var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
var configTemplate = "block-dir: %s/blocks\n"
var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)

func init() {
logger, _ = test.NewNullLogger()
}

func getConfig(t *testing.T) (config, tempdir string) {
func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
config = fmt.Sprintf(configTemplate, tempdir)
config = fmt.Sprintf(configTemplatePrefix, tempdir)
return
}

func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
config, tempdir = getConfigWithoutPattern(t)
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}

func TestDefaults(t *testing.T) {
require.Equal(t, defaultEncodingFormat, MessagepackFormat)
require.Equal(t, defaultIsGzip, true)
}

func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
assert.Equal(t, metadata.Name, meta.Name)
assert.Equal(t, metadata.Description, meta.Description)
assert.Equal(t, metadata.Deprecated, meta.Deprecated)
require.Equal(t, metadata.Name, meta.Name)
require.Equal(t, metadata.Description, meta.Description)
require.Equal(t, metadata.Deprecated, meta.Deprecated)
}

func TestExporterInitDefaults(t *testing.T) {
Expand Down Expand Up @@ -87,18 +102,18 @@ func TestExporterInitDefaults(t *testing.T) {
}

func TestExporterInit(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()
}

Expand Down Expand Up @@ -155,55 +170,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}

func TestExporterReceive(t *testing.T) {
config, tempdir := getConfig(t)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
assert.NotContains(t, string(blockBytes), " 0: ")
patterns := []string{
"%[1]d_block.json",
"%[1]d_block.json.gz",
"%[1]d_block.msgp",
"%[1]d_block.msgp.gz",
}
for _, pattern := range patterns {
pattern := pattern
t.Run(pattern, func(t *testing.T) {
t.Parallel()

var blockData data.BlockData
err = DecodeJSONFromFile(path, &blockData, true)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
format, isGzip, err := ParseFilenamePattern(pattern)
require.NoError(t, err)
config, tempdir := getConfigWithPattern(t, pattern)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(pattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
require.NotContains(t, string(blockBytes), " 0: ")

var blockData data.BlockData
err = DecodeFromFile(path, &blockData, format, isGzip)
require.NoError(t, err)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NotNil(t, blockData.Certificate)
}
})
}
}

func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

func TestPatternOverride(t *testing.T) {
config, tempdir := getConfig(t)
func TestPatternDefault(t *testing.T) {
config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()

patternOverride := "PREFIX_%[1]d_block.json"
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)

numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(patternOverride, i)
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)

var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
Expand All @@ -227,10 +254,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)
var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
assert.NoError(t, err)
assert.Nil(t, blockData.Certificate)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
require.Nil(t, blockData.Certificate)
}
}
4 changes: 2 additions & 2 deletions conduit/plugins/exporters/filewriter/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ config:
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Loading

0 comments on commit f2bc1a4

Please sign in to comment.