Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

file-plugins: enabling messagepack format #142

Merged
merged 24 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 12 additions & 20 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
"fmt"
"net/http"
"os"
"path"
"runtime/pprof"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
yaml "gopkg.in/yaml.v3"

sdk "github.com/algorand/go-algorand-sdk/v2/types"

Expand Down Expand Up @@ -103,29 +101,23 @@
}
}

// makeConfig creates a plugin config from a name and config pair.
// configWithLogger creates a plugin config from a name and config pair.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why rename this? Seems unrelated to the rest of the PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to set up the integration test, I wanted to break-out the pure config generating portion to a new method pluginType.GetConfig(). With this refactoring, it's clearer that this method does 2 things:

  1. sets up the plugin's config
  2. returns a logger that inherits the pipeline's logger setup

These seemed sufficiently unrelated to me that I thought it was worth rename.

However, renaming is not at all crucial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// It also creates a logger for the plugin and configures it using the pipeline's log settings.
func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
configs, err := yaml.Marshal(cfg.Config)
func (p *pipelineImpl) configWithLogger(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
var dataDir string
if p.cfg.ConduitArgs != nil {
dataDir = p.cfg.ConduitArgs.ConduitDataDir
}
config, err := pluginType.GetConfig(cfg, dataDir)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
return nil, plugins.PluginConfig{}, fmt.Errorf("configWithLogger(): unable to create plugin config: %w", err)
}

Check warning on line 114 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L113-L114

Added lines #L113 - L114 were not covered by tests

lgr := log.New()
lgr.SetOutput(p.logger.Out)
lgr.SetLevel(p.logger.Level)
lgr.SetFormatter(makePluginLogFormatter(string(pluginType), cfg.Name))

var config plugins.PluginConfig
config.Config = string(configs)
if p.cfg != nil && p.cfg.ConduitArgs != nil {
config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
err = os.MkdirAll(config.DataDir, os.ModePerm)
if err != nil {
return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
}
}

return lgr, config, nil
}

Expand Down Expand Up @@ -171,10 +163,10 @@
var pluginOverride uint64
var pluginOverrideName string // cache this in case of error.
for _, part := range parts {
_, config, err := p.makeConfig(part.cfg, part.t)
_, config, err := p.configWithLogger(part.cfg, part.t)
if err != nil {
return 0, err
}

Check warning on line 169 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L168-L169

Added lines #L168 - L169 were not covered by tests
rnd, err := part.RoundRequest(config)
if err != nil {
return 0, err
Expand Down Expand Up @@ -204,15 +196,15 @@
telemetryConfig := telemetry.MakeTelemetryConfig(p.cfg.Telemetry.URI, p.cfg.Telemetry.Index, p.cfg.Telemetry.UserName, p.cfg.Telemetry.Password)
telemetryClient, err := telemetry.MakeOpenSearchClient(telemetryConfig)
if err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
}

Check warning on line 200 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L199-L200

Added lines #L199 - L200 were not covered by tests
p.logger.Infof("Telemetry initialized with URI: %s", telemetryConfig.URI)

// If GUID is not in metadata, save it. Otherwise, use the GUID from metadata.
if p.pipelineMetadata.TelemetryID == "" {
p.pipelineMetadata.TelemetryID = telemetryClient.TelemetryConfig.GUID
} else {
telemetryClient.TelemetryConfig.GUID = p.pipelineMetadata.TelemetryID

Check warning on line 207 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L207

Added line #L207 was not covered by tests
}

return telemetryClient, nil
Expand All @@ -232,12 +224,12 @@
var err error
profFile, err := os.Create(p.cfg.CPUProfile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to create profile: %w", err)

Check warning on line 227 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L227

Added line #L227 was not covered by tests
}
p.profFile = profFile
err = pprof.StartCPUProfile(profFile)
if err != nil {
return fmt.Errorf("Pipeline.Init(): unable to start pprof: %w", err)

Check warning on line 232 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L232

Added line #L232 was not covered by tests
}
}

Expand Down Expand Up @@ -286,7 +278,7 @@
var telemetryErr error
telemetryClient, telemetryErr = p.initializeTelemetry()
if telemetryErr != nil {
p.logger.Warnf("Telemetry initialization failed, continuing without telemetry: %s", telemetryErr)

Check warning on line 281 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L281

Added line #L281 was not covered by tests
} else {
// Try sending a startup event. If it fails, log a warning and continue
event := telemetryClient.MakeTelemetryStartupEvent()
Expand All @@ -302,18 +294,18 @@

// Initialize Importer
{
importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
importerLogger, pluginConfig, err := p.configWithLogger(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}

Check warning on line 300 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L299-L300

Added lines #L299 - L300 were not covered by tests
err = p.importer.Init(p.ctx, *p.initProvider, pluginConfig, importerLogger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 304 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L303-L304

Added lines #L303 - L304 were not covered by tests
genesis, err := p.importer.GetGenesis()
if err != nil {
return fmt.Errorf("Pipeline.GetGenesis(): could not obtain Genesis from the importer (%s): %w", p.cfg.Importer.Name, err)
}

Check warning on line 308 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L307-L308

Added lines #L307 - L308 were not covered by tests
(*p.initProvider).SetGenesis(genesis)

// write pipeline metadata
Expand All @@ -326,8 +318,8 @@
p.pipelineMetadata.Network = genesis.Network
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
return fmt.Errorf("Pipeline.Init() failed to write metadata to file: %w", err)
}

Check warning on line 322 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L321-L322

Added lines #L321 - L322 were not covered by tests

p.logger.Infof("Initialized Importer: %s", p.cfg.Importer.Name)
}
Expand All @@ -335,27 +327,27 @@
// Initialize Processors
for idx, processor := range p.processors {
ncPair := p.cfg.Processors[idx]
logger, config, err := p.makeConfig(ncPair, plugins.Processor)
logger, config, err := p.configWithLogger(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)

Check warning on line 332 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L332

Added line #L332 was not covered by tests
}
err = processor.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair.Name, err)

Check warning on line 336 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L336

Added line #L336 was not covered by tests
}
p.logger.Infof("Initialized Processor: %s", ncPair.Name)
}

// Initialize Exporter
{
logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
logger, config, err := p.configWithLogger(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 346 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L345-L346

Added lines #L345 - L346 were not covered by tests
err = p.exporter.Init(p.ctx, *p.initProvider, config, logger)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize Exporter (%s): %w", p.cfg.Exporter.Name, err)
}

Check warning on line 350 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L349-L350

Added lines #L349 - L350 were not covered by tests
p.logger.Infof("Initialized Exporter: %s", p.cfg.Exporter.Name)
}

Expand Down Expand Up @@ -388,20 +380,20 @@
}
}

if err := p.importer.Close(); err != nil {

Check warning on line 383 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L383

Added line #L383 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Importer (%s) error on close: %v", p.importer.Metadata().Name, err)

Check warning on line 385 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L385

Added line #L385 was not covered by tests
}

for _, processor := range p.processors {
if err := processor.Close(); err != nil {

Check warning on line 389 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L389

Added line #L389 was not covered by tests
// Log and continue on closing the rest of the pipeline
p.logger.Errorf("Pipeline.Stop(): Processor (%s) error on close: %v", processor.Metadata().Name, err)

Check warning on line 391 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L391

Added line #L391 was not covered by tests
}
}

if err := p.exporter.Close(); err != nil {
p.logger.Errorf("Pipeline.Stop(): Exporter (%s) error on close: %v", p.exporter.Metadata().Name, err)

Check warning on line 396 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L395-L396

Added lines #L395 - L396 were not covered by tests
}
}

Expand Down Expand Up @@ -574,7 +566,7 @@
return nil, fmt.Errorf("MakePipeline(): could not build importer '%s': %w", importerName, err)
}

pipeline.importer = importerConstructor.New()

Check warning on line 569 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L569

Added line #L569 was not covered by tests
logger.Infof("Found Importer: %s", importerName)

// ---
Expand All @@ -582,12 +574,12 @@
for _, processorConfig := range cfg.Processors {
processorName := processorConfig.Name

processorConstructor, err := processors.ProcessorConstructorByName(processorName)

Check warning on line 577 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L577

Added line #L577 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build processor '%s': %w", processorName, err)
}

pipeline.processors = append(pipeline.processors, processorConstructor.New())

Check warning on line 582 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L582

Added line #L582 was not covered by tests
logger.Infof("Found Processor: %s", processorName)
}

Expand All @@ -595,12 +587,12 @@

exporterName := cfg.Exporter.Name

exporterConstructor, err := exporters.ExporterConstructorByName(exporterName)

Check warning on line 590 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L590

Added line #L590 was not covered by tests
if err != nil {
return nil, fmt.Errorf("MakePipeline(): could not build exporter '%s': %w", exporterName, err)
}

pipeline.exporter = exporterConstructor.New()

Check warning on line 595 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L595

Added line #L595 was not covered by tests
logger.Infof("Found Exporter: %s", exporterName)

return pipeline, nil
Expand All @@ -612,22 +604,22 @@
logger.Infof("Creating PID file at: %s", pidFilePath)
fout, err := os.Create(pidFilePath)
if err != nil {
err = fmt.Errorf("%s: could not create pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 610 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L607-L610

Added lines #L607 - L610 were not covered by tests

if _, err = fmt.Fprintf(fout, "%d", os.Getpid()); err != nil {
err = fmt.Errorf("%s: could not write pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 616 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L613-L616

Added lines #L613 - L616 were not covered by tests

err = fout.Close()
if err != nil {
err = fmt.Errorf("%s: could not close pid file, %v", pidFilePath, err)
logger.Error(err)
return err
}

Check warning on line 623 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L620-L623

Added lines #L620 - L623 were not covered by tests
return err
}
9 changes: 6 additions & 3 deletions conduit/plugins/exporters/filewriter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Write block data to files. This plugin works with the file rerader plugin to create a simple file-based pipeine.

## Configuration

```yml @sample.yaml
name: file_writer
config:
Expand All @@ -13,9 +14,11 @@ config:

# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# To specify JSON encoding, add a '.json' extension to the filename.
# To specify MessagePack encoding, add a '.msgp' extension to the filename.
# If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Expand Down
37 changes: 30 additions & 7 deletions conduit/plugins/exporters/filewriter/file_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
const (
// PluginName to use when configuring.
PluginName = "file_writer"

// FilePattern is used to name the output files.
FilePattern = "%[1]d_block.json"
FilePattern = "%[1]d_block.msgp.gz"
)

type fileExporter struct {
round uint64
cfg Config
gzip bool
format EncodingFormat
logger *logrus.Logger
}

Expand All @@ -51,20 +54,38 @@
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

Check warning on line 60 in conduit/plugins/exporters/filewriter/file_exporter.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/exporters/filewriter/file_exporter.go#L59-L60

Added lines #L59 - L60 were not covered by tests

// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
}
// create block directory
err = os.Mkdir(exp.cfg.BlocksDir, 0755)
if err != nil && errors.Is(err, os.ErrExist) {
// Ignore mkdir if the dir exists
err = nil
} else if err != nil {
if err != nil && !errors.Is(err, os.ErrExist) {
// Ignore mkdir err if the dir exists (case errors.Is(err, os.ErrExist))

Check warning on line 69 in conduit/plugins/exporters/filewriter/file_exporter.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/exporters/filewriter/file_exporter.go#L69

Added line #L69 was not covered by tests
return fmt.Errorf("Init() error: %w", err)
}

exp.round = uint64(initProvider.NextDBRound())
return err

// export the genesis as well in the same format
genesis := initProvider.GetGenesis()
genesisFile, err := GenesisFilename(exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error: %w", err)
}

Check warning on line 80 in conduit/plugins/exporters/filewriter/file_exporter.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/exporters/filewriter/file_exporter.go#L79-L80

Added lines #L79 - L80 were not covered by tests

genesisPath := path.Join(exp.cfg.BlocksDir, genesisFile)
err = EncodeToFile(genesisPath, genesis, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
}

Check warning on line 86 in conduit/plugins/exporters/filewriter/file_exporter.go

View check run for this annotation

Codecov / codecov/patch

conduit/plugins/exporters/filewriter/file_exporter.go#L85-L86

Added lines #L85 - L86 were not covered by tests

return nil
}

func (exp *fileExporter) Close() error {
Expand All @@ -87,10 +108,12 @@
}

blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
err := EncodeJSONToFile(blockFile, exportData, true)

err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}

exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}

Expand Down
111 changes: 69 additions & 42 deletions conduit/plugins/exporters/filewriter/file_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"

Expand All @@ -21,29 +20,45 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)

const (
defaultEncodingFormat = MessagepackFormat
defaultIsGzip = true
)

winder marked this conversation as resolved.
Show resolved Hide resolved
var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
var configTemplate = "block-dir: %s/blocks\n"
var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)

func init() {
logger, _ = test.NewNullLogger()
}

func getConfig(t *testing.T) (config, tempdir string) {
func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
config = fmt.Sprintf(configTemplate, tempdir)
config = fmt.Sprintf(configTemplatePrefix, tempdir)
return
}

func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
config, tempdir = getConfigWithoutPattern(t)
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}

func TestDefaults(t *testing.T) {
require.Equal(t, defaultEncodingFormat, MessagepackFormat)
require.Equal(t, defaultIsGzip, true)
}

func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
assert.Equal(t, metadata.Name, meta.Name)
assert.Equal(t, metadata.Description, meta.Description)
assert.Equal(t, metadata.Deprecated, meta.Deprecated)
require.Equal(t, metadata.Name, meta.Name)
require.Equal(t, metadata.Description, meta.Description)
require.Equal(t, metadata.Deprecated, meta.Deprecated)
}

func TestExporterInitDefaults(t *testing.T) {
Expand Down Expand Up @@ -87,18 +102,18 @@ func TestExporterInitDefaults(t *testing.T) {
}

func TestExporterInit(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()

// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()

// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
assert.NoError(t, err)
require.NoError(t, err)
fileExp.Close()
}

Expand Down Expand Up @@ -155,55 +170,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}

func TestExporterReceive(t *testing.T) {
config, tempdir := getConfig(t)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
assert.NotContains(t, string(blockBytes), " 0: ")
patterns := []string{
"%[1]d_block.json",
"%[1]d_block.json.gz",
"%[1]d_block.msgp",
"%[1]d_block.msgp.gz",
}
for _, pattern := range patterns {
pattern := pattern
t.Run(pattern, func(t *testing.T) {
tzaffi marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

var blockData data.BlockData
err = DecodeJSONFromFile(path, &blockData, true)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
format, isGzip, err := ParseFilenamePattern(pattern)
require.NoError(t, err)
config, tempdir := getConfigWithPattern(t, pattern)
fileExp := fileCons.New()
numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(pattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
require.FileExists(t, path)

blockBytes, err := os.ReadFile(path)
require.NoError(t, err)
require.NotContains(t, string(blockBytes), " 0: ")

var blockData data.BlockData
err = DecodeFromFile(path, &blockData, format, isGzip)
require.NoError(t, err)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NotNil(t, blockData.Certificate)
}
})
}
}

func TestExporterClose(t *testing.T) {
config, _ := getConfig(t)
config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}

func TestPatternOverride(t *testing.T) {
config, tempdir := getConfig(t)
func TestPatternDefault(t *testing.T) {
config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()

patternOverride := "PREFIX_%[1]d_block.json"
config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)

numRounds := 5
sendData(t, fileExp, config, numRounds)

// block data is valid
for i := 0; i < 5; i++ {
filename := fmt.Sprintf(patternOverride, i)
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)

var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
Expand All @@ -227,10 +254,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
assert.FileExists(t, path)
require.FileExists(t, path)
var blockData data.BlockData
err := DecodeJSONFromFile(path, &blockData, true)
assert.NoError(t, err)
assert.Nil(t, blockData.Certificate)
err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
require.Nil(t, blockData.Certificate)
}
}
8 changes: 5 additions & 3 deletions conduit/plugins/exporters/filewriter/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ config:

# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
# Default: "%[1]d_block.json"
filename-pattern: "%[1]d_block.json"
# To specify JSON encoding, add a '.json' extension to the filename.
# To specify MessagePack encoding, add a '.msgp' extension to the filename.
# If the file has a '.gz' extension, blocks will be gzipped regardless of encoding.
# Default: "%[1]d_block.msgp.gz"
filename-pattern: "%[1]d_block.msgp.gz"

# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
Loading
Loading