diff --git a/conduit/pipeline/pipeline.go b/conduit/pipeline/pipeline.go
index b77529d4..04797f92 100644
--- a/conduit/pipeline/pipeline.go
+++ b/conduit/pipeline/pipeline.go
@@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"os"
- "path"
"runtime/pprof"
"sync"
"time"
@@ -14,7 +13,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
- yaml "gopkg.in/yaml.v3"
sdk "github.com/algorand/go-algorand-sdk/v2/types"
@@ -103,12 +101,16 @@ func (p *pipelineImpl) registerPluginMetricsCallbacks() {
}
}
-// makeConfig creates a plugin config from a name and config pair.
+// configWithLogger creates a plugin config from a name and config pair.
// It also creates a logger for the plugin and configures it using the pipeline's log settings.
-func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
- configs, err := yaml.Marshal(cfg.Config)
+func (p *pipelineImpl) configWithLogger(cfg data.NameConfigPair, pluginType plugins.PluginType) (*log.Logger, plugins.PluginConfig, error) {
+ var dataDir string
+ if p.cfg.ConduitArgs != nil {
+ dataDir = p.cfg.ConduitArgs.ConduitDataDir
+ }
+ config, err := pluginType.GetConfig(cfg, dataDir)
if err != nil {
- return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig(): could not serialize config: %w", err)
+ return nil, plugins.PluginConfig{}, fmt.Errorf("configWithLogger(): unable to create plugin config: %w", err)
}
lgr := log.New()
@@ -116,16 +118,6 @@ func (p *pipelineImpl) makeConfig(cfg data.NameConfigPair, pluginType plugins.Pl
lgr.SetLevel(p.logger.Level)
lgr.SetFormatter(makePluginLogFormatter(string(pluginType), cfg.Name))
- var config plugins.PluginConfig
- config.Config = string(configs)
- if p.cfg != nil && p.cfg.ConduitArgs != nil {
- config.DataDir = path.Join(p.cfg.ConduitArgs.ConduitDataDir, fmt.Sprintf("%s_%s", pluginType, cfg.Name))
- err = os.MkdirAll(config.DataDir, os.ModePerm)
- if err != nil {
- return nil, plugins.PluginConfig{}, fmt.Errorf("makeConfig: unable to create plugin data directory: %w", err)
- }
- }
-
return lgr, config, nil
}
@@ -171,7 +163,7 @@ func (p *pipelineImpl) pluginRoundOverride() (uint64, error) {
var pluginOverride uint64
var pluginOverrideName string // cache this in case of error.
for _, part := range parts {
- _, config, err := p.makeConfig(part.cfg, part.t)
+ _, config, err := p.configWithLogger(part.cfg, part.t)
if err != nil {
return 0, err
}
@@ -302,7 +294,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Importer
{
- importerLogger, pluginConfig, err := p.makeConfig(p.cfg.Importer, plugins.Importer)
+ importerLogger, pluginConfig, err := p.configWithLogger(p.cfg.Importer, plugins.Importer)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not make %s config: %w", p.cfg.Importer.Name, err)
}
@@ -335,7 +327,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Processors
for idx, processor := range p.processors {
ncPair := p.cfg.Processors[idx]
- logger, config, err := p.makeConfig(ncPair, plugins.Processor)
+ logger, config, err := p.configWithLogger(ncPair, plugins.Processor)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", ncPair, err)
}
@@ -348,7 +340,7 @@ func (p *pipelineImpl) Init() error {
// Initialize Exporter
{
- logger, config, err := p.makeConfig(p.cfg.Exporter, plugins.Exporter)
+ logger, config, err := p.configWithLogger(p.cfg.Exporter, plugins.Exporter)
if err != nil {
return fmt.Errorf("Pipeline.Init(): could not initialize processor (%s): %w", p.cfg.Exporter.Name, err)
}
diff --git a/conduit/plugins/exporters/filewriter/file_exporter.go b/conduit/plugins/exporters/filewriter/file_exporter.go
index bcb3f7f6..877e0510 100644
--- a/conduit/plugins/exporters/filewriter/file_exporter.go
+++ b/conduit/plugins/exporters/filewriter/file_exporter.go
@@ -18,13 +18,24 @@ import (
const (
// PluginName to use when configuring.
PluginName = "file_writer"
+
// FilePattern is used to name the output files.
- FilePattern = "%[1]d_block.json"
+ FilePattern = "%[1]d_block.msgp.gz"
+)
+
+type EncodingFormat byte
+
+const (
+ MessagepackFormat EncodingFormat = iota
+ JSONFormat
+ UnrecognizedFormat
)
type fileExporter struct {
round uint64
cfg Config
+ gzip bool
+ format EncodingFormat
logger *logrus.Logger
}
@@ -51,6 +62,11 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
if exp.cfg.FilenamePattern == "" {
exp.cfg.FilenamePattern = FilePattern
}
+ exp.format, exp.gzip, err = ParseFilenamePattern(exp.cfg.FilenamePattern)
+ if err != nil {
+ return fmt.Errorf("Init() error: %w", err)
+ }
+
// default to the data directory if no override provided.
if exp.cfg.BlocksDir == "" {
exp.cfg.BlocksDir = cfg.DataDir
@@ -64,7 +80,21 @@ func (exp *fileExporter) Init(_ context.Context, initProvider data.InitProvider,
return fmt.Errorf("Init() error: %w", err)
}
exp.round = uint64(initProvider.NextDBRound())
- return err
+
+ // export the genesis as well in the same format
+ genesis := initProvider.GetGenesis()
+ genesisFile, err := GenesisFilename(exp.format, exp.gzip)
+ if err != nil {
+ return fmt.Errorf("Init() error: %w", err)
+ }
+
+ genesisPath := path.Join(exp.cfg.BlocksDir, genesisFile)
+ err = EncodeToFile(genesisPath, genesis, exp.format, exp.gzip)
+ if err != nil {
+ return fmt.Errorf("Init() error sending to genesisPath=%s: %w", genesisPath, err)
+ }
+
+ return nil
}
func (exp *fileExporter) Close() error {
@@ -87,10 +117,12 @@ func (exp *fileExporter) Receive(exportData data.BlockData) error {
}
blockFile := path.Join(exp.cfg.BlocksDir, fmt.Sprintf(exp.cfg.FilenamePattern, exportData.Round()))
- err := EncodeJSONToFile(blockFile, exportData, true)
+
+ err := EncodeToFile(blockFile, &exportData, exp.format, exp.gzip)
if err != nil {
return fmt.Errorf("Receive(): failed to write file %s: %w", blockFile, err)
}
+
exp.logger.Infof("Wrote block %d to %s", exportData.Round(), blockFile)
}
diff --git a/conduit/plugins/exporters/filewriter/file_exporter_test.go b/conduit/plugins/exporters/filewriter/file_exporter_test.go
index 555df398..3c12ead7 100644
--- a/conduit/plugins/exporters/filewriter/file_exporter_test.go
+++ b/conduit/plugins/exporters/filewriter/file_exporter_test.go
@@ -9,7 +9,6 @@ import (
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
- "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
@@ -21,29 +20,45 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)
+const (
+ defaultEncodingFormat = MessagepackFormat
+ defaultIsGzip = true
+)
+
var logger *logrus.Logger
var fileCons = exporters.ExporterConstructorFunc(func() exporters.Exporter {
return &fileExporter{}
})
-var configTemplate = "block-dir: %s/blocks\n"
+var configTemplatePrefix = "block-dir: %s/blocks\n"
var round = sdk.Round(2)
func init() {
logger, _ = test.NewNullLogger()
}
-func getConfig(t *testing.T) (config, tempdir string) {
+func getConfigWithoutPattern(t *testing.T) (config, tempdir string) {
tempdir = t.TempDir()
- config = fmt.Sprintf(configTemplate, tempdir)
+ config = fmt.Sprintf(configTemplatePrefix, tempdir)
+ return
+}
+
+func getConfigWithPattern(t *testing.T, pattern string) (config, tempdir string) {
+ config, tempdir = getConfigWithoutPattern(t)
+ config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, pattern)
return
}
+func TestDefaults(t *testing.T) {
+ require.Equal(t, defaultEncodingFormat, MessagepackFormat)
+ require.Equal(t, defaultIsGzip, true)
+}
+
func TestExporterMetadata(t *testing.T) {
fileExp := fileCons.New()
meta := fileExp.Metadata()
- assert.Equal(t, metadata.Name, meta.Name)
- assert.Equal(t, metadata.Description, meta.Description)
- assert.Equal(t, metadata.Deprecated, meta.Deprecated)
+ require.Equal(t, metadata.Name, meta.Name)
+ require.Equal(t, metadata.Description, meta.Description)
+ require.Equal(t, metadata.Deprecated, meta.Deprecated)
}
func TestExporterInitDefaults(t *testing.T) {
@@ -87,18 +102,18 @@ func TestExporterInitDefaults(t *testing.T) {
}
func TestExporterInit(t *testing.T) {
- config, _ := getConfig(t)
+ config, _ := getConfigWithPattern(t, "%[1]d_block.json")
fileExp := fileCons.New()
defer fileExp.Close()
// creates a new output file
err := fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
- assert.NoError(t, err)
+ require.NoError(t, err)
fileExp.Close()
// can open existing file
err = fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&round, nil, nil), plugins.MakePluginConfig(config), logger)
- assert.NoError(t, err)
+ require.NoError(t, err)
fileExp.Close()
}
@@ -155,55 +170,67 @@ func sendData(t *testing.T, fileExp exporters.Exporter, config string, numRounds
}
func TestExporterReceive(t *testing.T) {
- config, tempdir := getConfig(t)
- fileExp := fileCons.New()
- numRounds := 5
- sendData(t, fileExp, config, numRounds)
-
- // block data is valid
- for i := 0; i < 5; i++ {
- filename := fmt.Sprintf(FilePattern, i)
- path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
- require.FileExists(t, path)
-
- blockBytes, err := os.ReadFile(path)
- require.NoError(t, err)
- assert.NotContains(t, string(blockBytes), " 0: ")
+ patterns := []string{
+ "%[1]d_block.json",
+ "%[1]d_block.json.gz",
+ "%[1]d_block.msgp",
+ "%[1]d_block.msgp.gz",
+ }
+ for _, pattern := range patterns {
+ pattern := pattern
+ t.Run(pattern, func(t *testing.T) {
+ t.Parallel()
- var blockData data.BlockData
- err = DecodeJSONFromFile(path, &blockData, true)
- require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
- require.NoError(t, err)
- require.NotNil(t, blockData.Certificate)
+ format, isGzip, err := ParseFilenamePattern(pattern)
+ require.NoError(t, err)
+ config, tempdir := getConfigWithPattern(t, pattern)
+ fileExp := fileCons.New()
+ numRounds := 5
+ sendData(t, fileExp, config, numRounds)
+
+ // block data is valid
+ for i := 0; i < 5; i++ {
+ filename := fmt.Sprintf(pattern, i)
+ path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
+ require.FileExists(t, path)
+
+ blockBytes, err := os.ReadFile(path)
+ require.NoError(t, err)
+ require.NotContains(t, string(blockBytes), " 0: ")
+
+ var blockData data.BlockData
+ err = DecodeFromFile(path, &blockData, format, isGzip)
+ require.NoError(t, err)
+ require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
+ require.NotNil(t, blockData.Certificate)
+ }
+ })
}
}
func TestExporterClose(t *testing.T) {
- config, _ := getConfig(t)
+ config, _ := getConfigWithoutPattern(t)
fileExp := fileCons.New()
rnd := sdk.Round(0)
fileExp.Init(context.Background(), conduit.MakePipelineInitProvider(&rnd, nil, nil), plugins.MakePluginConfig(config), logger)
require.NoError(t, fileExp.Close())
}
-func TestPatternOverride(t *testing.T) {
- config, tempdir := getConfig(t)
+func TestPatternDefault(t *testing.T) {
+ config, tempdir := getConfigWithoutPattern(t)
fileExp := fileCons.New()
- patternOverride := "PREFIX_%[1]d_block.json"
- config = fmt.Sprintf("%sfilename-pattern: '%s'\n", config, patternOverride)
-
numRounds := 5
sendData(t, fileExp, config, numRounds)
// block data is valid
for i := 0; i < 5; i++ {
- filename := fmt.Sprintf(patternOverride, i)
+ filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/blocks/%s", tempdir, filename)
- assert.FileExists(t, path)
+ require.FileExists(t, path)
var blockData data.BlockData
- err := DecodeJSONFromFile(path, &blockData, true)
+ err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
require.Equal(t, sdk.Round(i), blockData.BlockHeader.Round)
require.NoError(t, err)
require.NotNil(t, blockData.Certificate)
@@ -227,10 +254,10 @@ func TestDropCertificate(t *testing.T) {
for i := 0; i < numRounds; i++ {
filename := fmt.Sprintf(FilePattern, i)
path := fmt.Sprintf("%s/%s", tempdir, filename)
- assert.FileExists(t, path)
+ require.FileExists(t, path)
var blockData data.BlockData
- err := DecodeJSONFromFile(path, &blockData, true)
- assert.NoError(t, err)
- assert.Nil(t, blockData.Certificate)
+ err := DecodeFromFile(path, &blockData, defaultEncodingFormat, defaultIsGzip)
+ require.NoError(t, err)
+ require.Nil(t, blockData.Certificate)
}
}
diff --git a/conduit/plugins/exporters/filewriter/sample.yaml b/conduit/plugins/exporters/filewriter/sample.yaml
index 59895661..11c6534a 100644
--- a/conduit/plugins/exporters/filewriter/sample.yaml
+++ b/conduit/plugins/exporters/filewriter/sample.yaml
@@ -8,8 +8,8 @@ config:
# FilenamePattern is the format used to write block files. It uses go
# string formatting and should accept one number for the round.
# If the file has a '.gz' extension, blocks will be gzipped.
- # Default: "%[1]d_block.json"
- filename-pattern: "%[1]d_block.json"
+ # Default: "%[1]d_block.msgp.gz"
+ filename-pattern: "%[1]d_block.msgp.gz"
# DropCertificate is used to remove the vote certificate from the block data before writing files.
drop-certificate: true
diff --git a/conduit/plugins/exporters/filewriter/util.go b/conduit/plugins/exporters/filewriter/util.go
index 313bd323..f7e8b99c 100644
--- a/conduit/plugins/exporters/filewriter/util.go
+++ b/conduit/plugins/exporters/filewriter/util.go
@@ -1,7 +1,6 @@
package filewriter
import (
- "bytes"
"compress/gzip"
"fmt"
"io"
@@ -9,44 +8,73 @@ import (
"strings"
"github.com/algorand/go-algorand-sdk/v2/encoding/json"
+ "github.com/algorand/go-algorand-sdk/v2/encoding/msgpack"
"github.com/algorand/go-codec/codec"
)
-var prettyHandle *codec.JsonHandle
-var jsonStrictHandle *codec.JsonHandle
+var jsonPrettyHandle *codec.JsonHandle
func init() {
- prettyHandle = new(codec.JsonHandle)
- prettyHandle.ErrorIfNoField = json.CodecHandle.ErrorIfNoField
- prettyHandle.ErrorIfNoArrayExpand = json.CodecHandle.ErrorIfNoArrayExpand
- prettyHandle.Canonical = json.CodecHandle.Canonical
- prettyHandle.RecursiveEmptyCheck = json.CodecHandle.RecursiveEmptyCheck
- prettyHandle.Indent = json.CodecHandle.Indent
- prettyHandle.HTMLCharsAsIs = json.CodecHandle.HTMLCharsAsIs
- prettyHandle.MapKeyAsString = true
- prettyHandle.Indent = 2
-
- jsonStrictHandle = new(codec.JsonHandle)
- jsonStrictHandle.ErrorIfNoField = prettyHandle.ErrorIfNoField
- jsonStrictHandle.ErrorIfNoArrayExpand = prettyHandle.ErrorIfNoArrayExpand
- jsonStrictHandle.Canonical = prettyHandle.Canonical
- jsonStrictHandle.RecursiveEmptyCheck = prettyHandle.RecursiveEmptyCheck
- jsonStrictHandle.Indent = prettyHandle.Indent
- jsonStrictHandle.HTMLCharsAsIs = prettyHandle.HTMLCharsAsIs
- jsonStrictHandle.MapKeyAsString = true
+ jsonPrettyHandle = new(codec.JsonHandle)
+ jsonPrettyHandle.ErrorIfNoField = json.CodecHandle.ErrorIfNoField
+ jsonPrettyHandle.ErrorIfNoArrayExpand = json.CodecHandle.ErrorIfNoArrayExpand
+ jsonPrettyHandle.Canonical = json.CodecHandle.Canonical
+ jsonPrettyHandle.RecursiveEmptyCheck = json.CodecHandle.RecursiveEmptyCheck
+ jsonPrettyHandle.Indent = json.CodecHandle.Indent
+ jsonPrettyHandle.HTMLCharsAsIs = json.CodecHandle.HTMLCharsAsIs
+ jsonPrettyHandle.MapKeyAsString = true
+ jsonPrettyHandle.Indent = 2
}
-// EncodeJSONToFile is used to encode an object to a file. If the file ends in .gz it will be gzipped.
-func EncodeJSONToFile(filename string, v interface{}, pretty bool) error {
- var writer io.Writer
+func ParseFilenamePattern(pattern string) (EncodingFormat, bool, error) {
+ originalPattern := pattern
+ gzip := false
+ if strings.HasSuffix(pattern, ".gz") {
+ gzip = true
+ pattern = pattern[:len(pattern)-3]
+ }
+
+ var blockFormat EncodingFormat
+ if strings.HasSuffix(pattern, ".msgp") {
+ blockFormat = MessagepackFormat
+ } else if strings.HasSuffix(pattern, ".json") {
+ blockFormat = JSONFormat
+ } else {
+ return UnrecognizedFormat, false, fmt.Errorf("unrecognized export format: %s", originalPattern)
+ }
+
+ return blockFormat, gzip, nil
+}
+
+func GenesisFilename(format EncodingFormat, isGzip bool) (string, error) {
+ var ext string
+
+ switch format {
+ case JSONFormat:
+ ext = ".json"
+ case MessagepackFormat:
+ ext = ".msgp"
+ default:
+ return "", fmt.Errorf("GenesisFilename(): unhandled format %d", format)
+ }
+ if isGzip {
+ ext += ".gz"
+ }
+
+ return fmt.Sprintf("genesis%s", ext), nil
+}
+
+// EncodeToFile enocods an object to a file using a given format and possible gzip compression.
+func EncodeToFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error {
file, err := os.Create(filename)
if err != nil {
- return fmt.Errorf("EncodeJSONToFile(): failed to create %s: %w", filename, err)
+ return fmt.Errorf("EncodeToFile(): failed to create %s: %w", filename, err)
}
defer file.Close()
- if strings.HasSuffix(filename, ".gz") {
+ var writer io.Writer
+ if isGzip {
gz := gzip.NewWriter(file)
gz.Name = filename
defer gz.Close()
@@ -55,41 +83,50 @@ func EncodeJSONToFile(filename string, v interface{}, pretty bool) error {
writer = file
}
- var handle *codec.JsonHandle
- if pretty {
- handle = prettyHandle
- } else {
- handle = jsonStrictHandle
+ return Encode(format, writer, v)
+}
+
+func Encode(format EncodingFormat, writer io.Writer, v interface{}) error {
+ var handle codec.Handle
+ switch format {
+ case JSONFormat:
+ handle = jsonPrettyHandle
+ case MessagepackFormat:
+ handle = msgpack.LenientCodecHandle
+ default:
+ return fmt.Errorf("EncodeToFile(): unhandled format %d", format)
}
- enc := codec.NewEncoder(writer, handle)
- return enc.Encode(v)
+ return codec.NewEncoder(writer, handle).Encode(v)
}
-// DecodeJSONFromFile is used to decode a file to an object.
-func DecodeJSONFromFile(filename string, v interface{}, strict bool) error {
- // Streaming into the decoder was slow.
- fileBytes, err := os.ReadFile(filename)
+// DecodeFromFile decodes a file to an object using a given format and possible gzip compression.
+func DecodeFromFile(filename string, v interface{}, format EncodingFormat, isGzip bool) error {
+ file, err := os.Open(filename)
if err != nil {
- return fmt.Errorf("DecodeJSONFromFile(): failed to read %s: %w", filename, err)
+ return fmt.Errorf("DecodeFromFile(): failed to open %s: %w", filename, err)
}
+ defer file.Close()
- var reader io.Reader = bytes.NewReader(fileBytes)
-
- if strings.HasSuffix(filename, ".gz") {
- gz, err := gzip.NewReader(reader)
+ var reader io.Reader
+ if isGzip {
+ gz, err := gzip.NewReader(file)
if err != nil {
- return fmt.Errorf("DecodeJSONFromFile(): failed to make gzip reader: %w", err)
+ return fmt.Errorf("DecodeFromFile(): failed to make gzip reader: %w", err)
}
defer gz.Close()
reader = gz
- }
- var handle *codec.JsonHandle
- if strict {
- handle = json.CodecHandle
} else {
- handle = json.LenientCodecHandle
+ reader = file
}
- enc := codec.NewDecoder(reader, handle)
- return enc.Decode(v)
+ var handle codec.Handle
+ switch format {
+ case JSONFormat:
+ handle = json.LenientCodecHandle
+ case MessagepackFormat:
+ handle = msgpack.LenientCodecHandle
+ default:
+ return fmt.Errorf("DecodeFromFile(): unhandled format %d", format)
+ }
+ return codec.NewDecoder(reader, handle).Decode(v)
}
diff --git a/conduit/plugins/exporters/filewriter/util_test.go b/conduit/plugins/exporters/filewriter/util_test.go
index ba017e28..0e89716b 100644
--- a/conduit/plugins/exporters/filewriter/util_test.go
+++ b/conduit/plugins/exporters/filewriter/util_test.go
@@ -1,13 +1,163 @@
package filewriter
import (
- "io/ioutil"
+ "os"
"path"
"testing"
"github.com/stretchr/testify/require"
)
+
+func TestParseFilenameFormat(t *testing.T) {
+ testCases := []struct {
+ name string
+ format string
+ gzip bool
+ blockFormat EncodingFormat
+ err string
+ }{
+ {
+ name: "messagepack vanilla",
+ format: "%d_block.msgp",
+ gzip: false,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "messagepack gzip",
+ format: "%d_block.msgp.gz",
+ gzip: true,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "json vanilla",
+ format: "%d_block.json",
+ gzip: false,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "json gzip",
+ format: "%d_block.json.gz",
+ gzip: true,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "messagepack vanilla 2",
+ format: "%[1]d_block round%[1]d.msgp",
+ gzip: false,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "messagepack gzip 2",
+ format: "%[1]d_block round%[1]d.msgp.gz",
+ gzip: true,
+ blockFormat: MessagepackFormat,
+ err: "",
+ },
+ {
+ name: "json vanilla 2",
+ format: "%[1]d_block round%[1]d.json",
+ gzip: false,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "json gzip 2",
+ format: "%[1]d_block round%[1]d.json.gz",
+ gzip: true,
+ blockFormat: JSONFormat,
+ err: "",
+ },
+ {
+ name: "invalid - gzip",
+ format: "%d_block.msgp.gzip",
+ gzip: false,
+ blockFormat: UnrecognizedFormat,
+ err: "unrecognized export format",
+ },
+ {
+ name: "invalid - no extension",
+ format: "%d_block",
+ gzip: false,
+ blockFormat: UnrecognizedFormat,
+ err: "unrecognized export format",
+ },
+ }
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.name, func(t *testing.T) {
+ t.Parallel()
+
+ blockFormat, gzip, err := ParseFilenamePattern(tc.format)
+ if tc.err == "" {
+ require.NoError(t, err)
+ require.Equal(t, tc.gzip, gzip)
+ require.Equal(t, tc.blockFormat, blockFormat)
+ } else {
+ require.ErrorContains(t, err, tc.err)
+ }
+ })
+ }
+}
+
+func TestGenesisFilename(t *testing.T) {
+ testCases := []struct {
+ blockFormat EncodingFormat
+ gzip bool
+ result string
+ err string
+ }{
+ {
+ blockFormat: MessagepackFormat,
+ gzip: false,
+ result: "genesis.msgp",
+ err: "",
+ },
+ {
+ blockFormat: MessagepackFormat,
+ gzip: true,
+ result: "genesis.msgp.gz",
+ err: "",
+ },
+ {
+ blockFormat: JSONFormat,
+ gzip: false,
+ result: "genesis.json",
+ err: "",
+ },
+ {
+ blockFormat: JSONFormat,
+ gzip: true,
+ result: "genesis.json.gz",
+ err: "",
+ },
+ {
+ result: "error case",
+ blockFormat: EncodingFormat(42),
+ err: "GenesisFilename(): unhandled format 42",
+ },
+ }
+ for _, tc := range testCases {
+ tc := tc
+ t.Run(tc.result, func(t *testing.T) {
+ t.Parallel()
+
+ filename, err := GenesisFilename(tc.blockFormat, tc.gzip)
+ if tc.err == "" {
+ require.NoError(t, err)
+ require.Equal(t, tc.result, filename)
+ } else {
+ require.ErrorContains(t, err, tc.err)
+ }
+ })
+ }
+}
+
func TestEncodeToAndFromFile(t *testing.T) {
tempdir := t.TempDir()
@@ -25,16 +175,17 @@ func TestEncodeToAndFromFile(t *testing.T) {
}
{
- pretty := path.Join(tempdir, "pretty.json")
- err := EncodeJSONToFile(pretty, data, true)
+ jsonFile := path.Join(tempdir, "json.json")
+ err := EncodeToFile(jsonFile, data, JSONFormat, false)
require.NoError(t, err)
- require.FileExists(t, pretty)
+ require.FileExists(t, jsonFile)
var testDecode test
- err = DecodeJSONFromFile(pretty, &testDecode, false)
+ err = DecodeFromFile(jsonFile, &testDecode, JSONFormat, false)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
// Check the pretty printing
- bytes, err := ioutil.ReadFile(pretty)
+ bytes, err := os.ReadFile(jsonFile)
require.NoError(t, err)
require.Contains(t, string(bytes), " \"one\": \"one\",\n")
require.Contains(t, string(bytes), `"0": "int-key"`)
@@ -42,22 +193,24 @@ func TestEncodeToAndFromFile(t *testing.T) {
{
small := path.Join(tempdir, "small.json")
- err := EncodeJSONToFile(small, data, false)
+ err := EncodeToFile(small, data, JSONFormat, false)
require.NoError(t, err)
require.FileExists(t, small)
var testDecode test
- err = DecodeJSONFromFile(small, &testDecode, false)
+ err = DecodeFromFile(small, &testDecode, JSONFormat, false)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
}
// gzip test
{
small := path.Join(tempdir, "small.json.gz")
- err := EncodeJSONToFile(small, data, false)
+ err := EncodeToFile(small, data, JSONFormat, true)
require.NoError(t, err)
require.FileExists(t, small)
var testDecode test
- err = DecodeJSONFromFile(small, &testDecode, false)
+ err = DecodeFromFile(small, &testDecode, JSONFormat, true)
+ require.NoError(t, err)
require.Equal(t, data, testDecode)
}
}
diff --git a/conduit/plugins/importers/filereader/README.md b/conduit/plugins/importers/filereader/README.md
index cd023d71..2418be2b 100644
--- a/conduit/plugins/importers/filereader/README.md
+++ b/conduit/plugins/importers/filereader/README.md
@@ -10,16 +10,7 @@ config:
# The directory is created if it doesn't exist. If no directory is provided
# blocks are written to the Conduit data directory.
#block-dir: "/path/to/directory"
-
- # RetryDuration controls the delay between checks when the importer has
- # caught up and is waiting for new blocks to appear.
- retry-duration: "5s"
-
- # RetryCount controls the number of times to check for a missing block
- # before generating an error. The retry count and retry duration should
- # be configured according the expected round time.
- retry-count: 5
-
+
# FilenamePattern is the format used to find block files. It uses go string
# formatting and should accept one number for the round.
filename-pattern: "%[1]d_block.json"
diff --git a/conduit/plugins/importers/filereader/fileReadWrite_test.go b/conduit/plugins/importers/filereader/fileReadWrite_test.go
new file mode 100644
index 00000000..b7b37efe
--- /dev/null
+++ b/conduit/plugins/importers/filereader/fileReadWrite_test.go
@@ -0,0 +1,191 @@
+package fileimporter
+
+import (
+ "bytes"
+ "compress/gzip"
+ "context"
+ "fmt"
+ "os"
+ "path"
+ "strings"
+ "testing"
+
+ logrusTest "github.com/sirupsen/logrus/hooks/test"
+ "github.com/stretchr/testify/require"
+
+ "github.com/algorand/conduit/conduit"
+ "github.com/algorand/conduit/conduit/data"
+ "github.com/algorand/conduit/conduit/plugins"
+ "github.com/algorand/conduit/conduit/plugins/exporters"
+ "github.com/algorand/conduit/conduit/plugins/exporters/filewriter"
+ "github.com/algorand/conduit/conduit/plugins/importers"
+ sdk "github.com/algorand/go-algorand-sdk/v2/types"
+)
+
+const (
+ conduitDataDir = "test_resources/conduit_data"
+ filePattern = "%[1]d_block.msgp.gz"
+ importerBlockDir = "test_resources/filereader_blocks"
+ exporterBlockDir = "test_resources/conduit_data/exporter_file_writer"
+)
+
+func cleanArtifacts(t *testing.T) {
+ err := os.RemoveAll(exporterBlockDir)
+ require.NoError(t, err)
+}
+
+// numGzippedFiles returns the number of files in the importerBlockDir
+// whose filename ends in .gz
+func numGzippedFiles(t *testing.T) uint64 {
+ files, err := os.ReadDir(importerBlockDir)
+ require.NoError(t, err)
+
+ gzCount := uint64(0)
+ for _, file := range files {
+ if strings.HasSuffix(file.Name(), ".gz") {
+ gzCount++
+ }
+ }
+
+ return gzCount
+}
+
+func uncompressBytes(t *testing.T, path string) []byte {
+ file, err := os.Open(path)
+ require.NoError(t, err, "error opening file %s", path)
+ defer file.Close()
+
+ gr, err := gzip.NewReader(file)
+ require.NoError(t, err, "error creating gzip reader for file %s", path)
+ defer gr.Close()
+
+ var buf bytes.Buffer
+ _, err = buf.ReadFrom(gr)
+ require.NoError(t, err, "error reading file %s", path)
+
+ return buf.Bytes()
+}
+
+func identicalFilesUncompressed(t *testing.T, path1, path2 string) {
+ var file1, file2 *os.File
+
+ defer func() {
+ if file1 != nil {
+ file1.Close()
+ }
+ if file2 != nil {
+ file2.Close()
+ }
+ }()
+
+ bytes1 := uncompressBytes(t, path1)
+ bytes2 := uncompressBytes(t, path2)
+ require.Equal(t, len(bytes1), len(bytes2), "files %s and %s have different lengths", path1, path2)
+
+ for i, b1 := range bytes1 {
+ b2 := bytes2[i]
+ require.Equal(t, b1, b2, "files %s and %s differ at byte %d (%s) v (%s)", path1, path2, i, string(b1), string(b2))
+ }
+}
+
+// TestRoundTrip tests that blocks read by the filereader importer
+// under the msgp.gz encoding are written to identical files by the filewriter exporter.
+// This includes both a genesis block and a round-0 block with differend encodings.
+func TestRoundTrip(t *testing.T) {
+ cleanArtifacts(t)
+ defer cleanArtifacts(t)
+
+ round := sdk.Round(0)
+ lastRound := numGzippedFiles(t) - 2 // subtract round-0 and the separate genesis file
+ require.GreaterOrEqual(t, lastRound, uint64(1))
+ require.LessOrEqual(t, lastRound, uint64(1000)) // overflow sanity check
+
+ ctx := context.Background()
+
+ plineConfig, err := data.MakePipelineConfig(&data.Args{
+ ConduitDataDir: conduitDataDir,
+ })
+ require.NoError(t, err)
+
+ logger, _ := logrusTest.NewNullLogger()
+
+ // Assert configurations:
+ require.Equal(t, "file_reader", plineConfig.Importer.Name)
+ require.Equal(t, importerBlockDir, plineConfig.Importer.Config["block-dir"])
+ require.Equal(t, filePattern, plineConfig.Importer.Config["filename-pattern"])
+
+ require.Equal(t, "file_writer", plineConfig.Exporter.Name)
+ require.Equal(t, filePattern, plineConfig.Exporter.Config["filename-pattern"])
+ require.False(t, plineConfig.Exporter.Config["drop-certificate"].(bool))
+
+ // Simulate the portions of the pipeline's Init() that interact
+ // with the importer and exporter
+ initProvider := conduit.MakePipelineInitProvider(&round, nil, nil)
+
+ // Importer init
+ impCtor, err := importers.ImporterConstructorByName(plineConfig.Importer.Name)
+ require.NoError(t, err)
+ importer := impCtor.New()
+ impConfig, err := plugins.Importer.GetConfig(plineConfig.Importer, conduitDataDir)
+ require.NoError(t, err)
+ require.Equal(t, path.Join(conduitDataDir, "importer_file_reader"), impConfig.DataDir)
+
+ err = importer.Init(ctx, initProvider, impConfig, logger)
+ require.NoError(t, err)
+
+ impGenesis, err := importer.GetGenesis()
+ require.NoError(t, err)
+ require.Equal(t, "generated-network", impGenesis.Network)
+
+ // it should be the same as unmarshalling it directly from the expected path
+ genesisFile, err := filewriter.GenesisFilename(filewriter.MessagepackFormat, true)
+ require.Equal(t, "genesis.msgp.gz", genesisFile)
+ require.NoError(t, err)
+
+ impGenesisPath := path.Join(importerBlockDir, genesisFile)
+ genesis := &sdk.Genesis{}
+
+ err = filewriter.DecodeFromFile(impGenesisPath, genesis, filewriter.MessagepackFormat, true)
+ require.NoError(t, err)
+
+ require.Equal(t, impGenesis, genesis)
+
+ initProvider.SetGenesis(impGenesis)
+
+ // Construct the exporter
+ expCtor, err := exporters.ExporterConstructorByName(plineConfig.Exporter.Name)
+ require.NoError(t, err)
+ exporter := expCtor.New()
+ expConfig, err := plugins.Exporter.GetConfig(plineConfig.Exporter, conduitDataDir)
+ require.NoError(t, err)
+ require.Equal(t, path.Join(conduitDataDir, "exporter_file_writer"), expConfig.DataDir)
+
+ err = exporter.Init(ctx, initProvider, expConfig, logger)
+ require.NoError(t, err)
+
+ // It should have persisted the genesis which ought to be identical
+ // to the importer's.
+ expGenesisPath := path.Join(exporterBlockDir, genesisFile)
+ identicalFilesUncompressed(t, impGenesisPath, expGenesisPath)
+
+ // Simulate the pipeline
+ require.Equal(t, sdk.Round(0), round)
+ for ; uint64(round) <= lastRound; round++ {
+ blk, err := importer.GetBlock(uint64(round))
+ require.NoError(t, err)
+
+ expBlockPath := path.Join(exporterBlockDir, fmt.Sprintf(filePattern, round))
+ _, err = os.OpenFile(expBlockPath, os.O_RDONLY, 0)
+ require.ErrorIs(t, err, os.ErrNotExist)
+
+ err = exporter.Receive(blk)
+ require.NoError(t, err)
+
+ _, err = os.OpenFile(expBlockPath, os.O_RDONLY, 0)
+ require.NoError(t, err)
+
+ impBlockBath := path.Join(importerBlockDir, fmt.Sprintf(filePattern, round))
+
+ identicalFilesUncompressed(t, impBlockBath, expBlockPath)
+ }
+}
diff --git a/conduit/plugins/importers/filereader/filereader.go b/conduit/plugins/importers/filereader/filereader.go
index 874e1185..ce2ab7f0 100644
--- a/conduit/plugins/importers/filereader/filereader.go
+++ b/conduit/plugins/importers/filereader/filereader.go
@@ -3,10 +3,11 @@ package fileimporter
import (
"context"
_ "embed" // used to embed config
- "errors"
"fmt"
- "io/fs"
+ "os"
"path"
+ "regexp"
+ "strconv"
"time"
"github.com/sirupsen/logrus"
@@ -25,10 +26,19 @@ const PluginName = "file_reader"
type fileReader struct {
logger *logrus.Logger
cfg Config
+ gzip bool
+ format filewriter.EncodingFormat
ctx context.Context
cancel context.CancelFunc
}
+// package-wide init function
+func init() {
+ importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
+ return &fileReader{}
+ }))
+}
+
// New initializes an algod importer
func New() importers.Importer {
return &fileReader{}
@@ -48,13 +58,6 @@ func (r *fileReader) Metadata() plugins.Metadata {
return metadata
}
-// package-wide init function
-func init() {
- importers.Register(PluginName, importers.ImporterConstructorFunc(func() importers.Importer {
- return &fileReader{}
- }))
-}
-
func (r *fileReader) Init(ctx context.Context, _ data.InitProvider, cfg plugins.PluginConfig, logger *logrus.Logger) error {
r.ctx, r.cancel = context.WithCancel(ctx)
r.logger = logger
@@ -66,20 +69,35 @@ func (r *fileReader) Init(ctx context.Context, _ data.InitProvider, cfg plugins.
if r.cfg.FilenamePattern == "" {
r.cfg.FilenamePattern = filewriter.FilePattern
}
+ r.format, r.gzip, err = filewriter.ParseFilenamePattern(r.cfg.FilenamePattern)
+ if err != nil {
+ return fmt.Errorf("Init() error: %w", err)
+ }
return nil
}
+// GetGenesis returns the genesis. Is is assumed that
+// the genesis file is available __in addition to__ the round 0 block file.
+// This is because the encoding assumed for the genesis is different
+// from the encoding assumed for blocks.
+// TODO: handle the case of a multipurpose file that contains both encodings.
func (r *fileReader) GetGenesis() (*sdk.Genesis, error) {
- genesisFile := path.Join(r.cfg.BlocksDir, "genesis.json")
+ genesisFile, err := filewriter.GenesisFilename(r.format, r.gzip)
+ if err != nil {
+ return nil, fmt.Errorf("GetGenesis(): failed to get genesis filename: %w", err)
+ }
+ genesisFile = path.Join(r.cfg.BlocksDir, genesisFile)
+
var genesis sdk.Genesis
- err := filewriter.DecodeJSONFromFile(genesisFile, &genesis, false)
+ err = filewriter.DecodeFromFile(genesisFile, &genesis, r.format, r.gzip)
if err != nil {
return nil, fmt.Errorf("GetGenesis(): failed to process genesis file: %w", err)
}
return &genesis, nil
}
+
func (r *fileReader) Close() error {
if r.cancel != nil {
r.cancel()
@@ -87,32 +105,53 @@ func (r *fileReader) Close() error {
return nil
}
-func (r *fileReader) GetBlock(rnd uint64) (data.BlockData, error) {
- attempts := r.cfg.RetryCount
- for {
- filename := path.Join(r.cfg.BlocksDir, fmt.Sprintf(r.cfg.FilenamePattern, rnd))
- var blockData data.BlockData
- start := time.Now()
- err := filewriter.DecodeJSONFromFile(filename, &blockData, false)
- if err != nil && errors.Is(err, fs.ErrNotExist) {
- // If the file read failed because the file didn't exist, wait before trying again
- if attempts == 0 {
- return data.BlockData{}, fmt.Errorf("GetBlock(): block not found after (%d) attempts", r.cfg.RetryCount)
- }
- attempts--
-
- select {
- case <-time.After(r.cfg.RetryDuration):
- case <-r.ctx.Done():
- return data.BlockData{}, fmt.Errorf("GetBlock() context finished: %w", r.ctx.Err())
- }
- } else if err != nil {
- // Other error, return error to pipeline
- return data.BlockData{}, fmt.Errorf("GetBlock(): unable to read block file '%s': %w", filename, err)
- } else {
- r.logger.Infof("Block %d read time: %s", rnd, time.Since(start))
- // The read was fine, return the data.
- return blockData, nil
+func posErr(file string, err error) error {
+ pattern := `pos (\d+)`
+ re := regexp.MustCompile(pattern)
+
+ // Find the position
+ match := re.FindStringSubmatch(err.Error())
+ var position int
+ if len(match) > 1 {
+ var err2 error
+ position, err2 = strconv.Atoi(match[1])
+ if err2 != nil {
+ return fmt.Errorf("unable to parse position: %w, err: %w", err2, err)
}
+ } else {
+ return fmt.Errorf("unknown error: %w", err)
+ }
+
+ content, err2 := os.ReadFile(file)
+ if err2 != nil {
+ return fmt.Errorf("error reading file: %w, err: %w", err2, err)
+ }
+
+ radius := 20
+ start := position - radius
+ if start < 0 {
+ start = 0
+ }
+ end := position + radius
+ if end > len(content) {
+ end = len(content)
+ }
+
+ return fmt.Errorf(`error in %s @position %d: %w
+<<<<<%s>>>>>`, file, position, err, string(content[start:end]))
+}
+
+func (r *fileReader) GetBlock(rnd uint64) (data.BlockData, error) {
+ filename := path.Join(r.cfg.BlocksDir, fmt.Sprintf(r.cfg.FilenamePattern, rnd))
+ var blockData data.BlockData
+ start := time.Now()
+
+ // Read file content
+ err := filewriter.DecodeFromFile(filename, &blockData, r.format, r.gzip)
+ if err != nil {
+ err = posErr(filename, err)
+ return data.BlockData{}, fmt.Errorf("GetBlock(): unable to read block file '%s': %w", filename, err)
}
+ r.logger.Infof("Block %d read time: %s", rnd, time.Since(start))
+ return blockData, nil
}
diff --git a/conduit/plugins/importers/filereader/filereader_config.go b/conduit/plugins/importers/filereader/filereader_config.go
index bd4f7352..34e6e8c7 100644
--- a/conduit/plugins/importers/filereader/filereader_config.go
+++ b/conduit/plugins/importers/filereader/filereader_config.go
@@ -3,23 +3,13 @@ package fileimporter
//go:generate go run ../../../../cmd/conduit-docs/main.go ../../../../conduit-docs/
//go:generate go run ../../../../cmd/readme_config_includer/generator.go
-import "time"
-
//Name: conduit_importers_filereader
// Config specific to the file importer
type Config struct {
// block-dir
is the path to a directory where block data is stored.
BlocksDir string `yaml:"block-dir"`
- /* retry-duration
controls the delay between checks when the importer has caught up and is waiting for new blocks to appear.
- The input duration will be interpreted in nanoseconds.
- */
- RetryDuration time.Duration `yaml:"retry-duration"`
- /* retry-count
controls the number of times to check for a missing block
- before generating an error. The retry count and retry duration should
- be configured according the expected round time.
- */
- RetryCount uint64 `yaml:"retry-count"`
+
/* filename-pattern
is the format used to find block files. It uses go string formatting and should accept one number for the round.
The default pattern is
diff --git a/conduit/plugins/importers/filereader/filereader_test.go b/conduit/plugins/importers/filereader/filereader_test.go
index 42cafc99..8421c9bf 100644
--- a/conduit/plugins/importers/filereader/filereader_test.go
+++ b/conduit/plugins/importers/filereader/filereader_test.go
@@ -6,7 +6,6 @@ import (
"os"
"path"
"testing"
- "time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
@@ -22,10 +21,14 @@ import (
sdk "github.com/algorand/go-algorand-sdk/v2/types"
)
+const (
+ defaultEncodingFormat = filewriter.MessagepackFormat
+ defaultIsGzip = true
+)
+
+
var (
logger *logrus.Logger
- ctx context.Context
- cancel context.CancelFunc
testImporter importers.Importer
pRound sdk.Round
)
@@ -37,6 +40,12 @@ func init() {
pRound = sdk.Round(1)
}
+func TestDefaults(t *testing.T) {
+ require.Equal(t, defaultEncodingFormat, filewriter.MessagepackFormat)
+ require.Equal(t, defaultIsGzip, true)
+}
+
+
func TestImporterorterMetadata(t *testing.T) {
testImporter = New()
m := testImporter.Metadata()
@@ -57,7 +66,10 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis {
Timestamp: 1234,
}
- err := filewriter.EncodeJSONToFile(path.Join(dir, "genesis.json"), genesisA, true)
+ genesisFilename, err := filewriter.GenesisFilename(defaultEncodingFormat, defaultIsGzip)
+ require.NoError(t, err)
+
+ err = filewriter.EncodeToFile(path.Join(dir, genesisFilename), genesisA, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
for i := 0; i < numRounds; i++ {
@@ -70,7 +82,7 @@ func initializeTestData(t *testing.T, dir string, numRounds int) sdk.Genesis {
Certificate: nil,
}
blockFile := path.Join(dir, fmt.Sprintf(filewriter.FilePattern, i))
- err = filewriter.EncodeJSONToFile(blockFile, block, true)
+ err = filewriter.EncodeToFile(blockFile, block, defaultEncodingFormat, defaultIsGzip)
require.NoError(t, err)
}
@@ -83,7 +95,6 @@ func initializeImporter(t *testing.T, numRounds int) (importer importers.Importe
importer = New()
cfg := Config{
BlocksDir: tempdir,
- RetryDuration: 0,
}
data, err := yaml.Marshal(cfg)
require.NoError(t, err)
@@ -122,54 +133,3 @@ func TestGetBlockSuccess(t *testing.T) {
require.Equal(t, sdk.Round(i), block.BlockHeader.Round)
}
}
-
-func TestRetryAndDuration(t *testing.T) {
- tempdir := t.TempDir()
- initializeTestData(t, tempdir, 0)
- importer := New()
- cfg := Config{
- BlocksDir: tempdir,
- RetryDuration: 10 * time.Millisecond,
- RetryCount: 3,
- }
- data, err := yaml.Marshal(cfg)
- require.NoError(t, err)
- err = importer.Init(context.Background(), conduit.MakePipelineInitProvider(&pRound, nil, nil), plugins.MakePluginConfig(string(data)), logger)
- assert.NoError(t, err)
-
- start := time.Now()
- _, err = importer.GetBlock(0)
- assert.ErrorContains(t, err, "GetBlock(): block not found after (3) attempts")
-
- expectedDuration := cfg.RetryDuration*time.Duration(cfg.RetryCount) + 10*time.Millisecond
- assert.WithinDuration(t, start, time.Now(), expectedDuration, "Error should generate after retry count * retry duration")
-}
-
-func TestRetryWithCancel(t *testing.T) {
- tempdir := t.TempDir()
- initializeTestData(t, tempdir, 0)
- importer := New()
- cfg := Config{
- BlocksDir: tempdir,
- RetryDuration: 1 * time.Hour,
- RetryCount: 3,
- }
- data, err := yaml.Marshal(cfg)
- ctx, cancel := context.WithCancel(context.Background())
- require.NoError(t, err)
- err = importer.Init(ctx, conduit.MakePipelineInitProvider(&pRound, nil, nil), plugins.MakePluginConfig(string(data)), logger)
- assert.NoError(t, err)
-
- // Cancel after delay
- delay := 5 * time.Millisecond
- go func() {
- time.Sleep(delay)
- cancel()
- }()
- start := time.Now()
- _, err = importer.GetBlock(0)
- assert.ErrorContains(t, err, "GetBlock() context finished: context canceled")
-
- // within 1ms of the expected time (but much less than the 3hr configuration.
- assert.WithinDuration(t, start, time.Now(), 2*delay)
-}
diff --git a/conduit/plugins/importers/filereader/sample.yaml b/conduit/plugins/importers/filereader/sample.yaml
index d0473c26..fbe64b3b 100644
--- a/conduit/plugins/importers/filereader/sample.yaml
+++ b/conduit/plugins/importers/filereader/sample.yaml
@@ -4,16 +4,7 @@ config:
# The directory is created if it doesn't exist. If no directory is provided
# blocks are written to the Conduit data directory.
#block-dir: "/path/to/directory"
-
- # RetryDuration controls the delay between checks when the importer has
- # caught up and is waiting for new blocks to appear.
- retry-duration: "5s"
-
- # RetryCount controls the number of times to check for a missing block
- # before generating an error. The retry count and retry duration should
- # be configured according the expected round time.
- retry-count: 5
-
+
# FilenamePattern is the format used to find block files. It uses go string
# formatting and should accept one number for the round.
- filename-pattern: "%[1]d_block.json"
+ filename-pattern: "%[1]d_block.msgp.gz"
diff --git a/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml b/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml
new file mode 100644
index 00000000..525bfcaf
--- /dev/null
+++ b/conduit/plugins/importers/filereader/test_resources/conduit_data/conduit.yml
@@ -0,0 +1,72 @@
+# Log verbosity: PANIC, FATAL, ERROR, WARN, INFO, DEBUG, TRACE
+log-level: INFO
+
+# If no log file is provided logs are written to stdout.
+#log-file:
+
+# Number of retries to perform after a pipeline plugin error.
+# Set to 0 to retry forever.
+retry-count: 10
+
+# Time duration to wait between retry attempts.
+retry-delay: "1s"
+
+# Optional filepath to use for pidfile.
+#pid-filepath: /path/to/pidfile
+
+# Whether or not to print the conduit banner on startup.
+hide-banner: false
+
+# When enabled prometheus metrics are available on '/metrics'
+metrics:
+ mode: OFF
+ addr: ":9999"
+ prefix: "conduit"
+
+
+# The importer is typically an algod follower node.
+importer:
+ name: file_reader
+ config:
+ # for purposes of fileReadWriteTest.go we have
+ # CWD = conduit/plugins/importers/filereader
+ # so `test_resources` is immediately available
+ block-dir: "test_resources/filereader_blocks"
+
+ # FilenamePattern is the format used to find block files. It uses go string
+ # formatting and should accept one number for the round.
+ filename-pattern: "%[1]d_block.msgp.gz"
+
+# Zero or more processors may be defined to manipulate what data
+# reaches the exporter.
+processors:
+
+# An exporter is defined to do something with the data.
+exporter:
+ name: "file_writer"
+ config:
+ # BlocksDir is the path to a directory where block data should be stored.
+ # The directory is created if it doesn't exist. If no directory is provided
+ # blocks are written to the Conduit data directory.
+ #block-dir: "/path/to/block/files"
+
+ # FilenamePattern is the format used to write block files. It uses go
+ # string formatting and should accept one number for the round.
+ # If the file has a '.gz' extension, blocks will be gzipped.
+ # Default: "%[1]d_block.msgp.gz"
+ filename-pattern: "%[1]d_block.msgp.gz"
+
+ # DropCertificate is used to remove the vote certificate from the block data before writing files.
+ drop-certificate: false
+
+
+# Enable telemetry for conduit
+telemetry:
+ enabled: false
+
+ # By default the following fields will be configured to send data to Algorand.
+ # To store your own telemetry events, they can be overridden.
+ # uri: ""
+ # index: ""
+ # username: ""
+ # password: ""
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz
new file mode 100644
index 00000000..6ad7f275
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/0_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz
new file mode 100644
index 00000000..947f7190
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/1_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz
new file mode 100644
index 00000000..a8ec23cf
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/2_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz
new file mode 100644
index 00000000..6eb23030
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/3_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz
new file mode 100644
index 00000000..888e0853
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/4_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz
new file mode 100644
index 00000000..293070bf
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/5_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz
new file mode 100644
index 00000000..d78db3c1
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/6_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz
new file mode 100644
index 00000000..ee609a06
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/7_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz
new file mode 100644
index 00000000..f1db8be9
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/8_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz
new file mode 100644
index 00000000..16d521f1
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/9_block.msgp.gz differ
diff --git a/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.msgp.gz b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.msgp.gz
new file mode 100644
index 00000000..452327a5
Binary files /dev/null and b/conduit/plugins/importers/filereader/test_resources/filereader_blocks/genesis.msgp.gz differ
diff --git a/conduit/plugins/metadata.go b/conduit/plugins/metadata.go
index 99adf929..e37b9b8b 100644
--- a/conduit/plugins/metadata.go
+++ b/conduit/plugins/metadata.go
@@ -1,5 +1,15 @@
package plugins
+import (
+ "fmt"
+ "os"
+ "path"
+
+ yaml "gopkg.in/yaml.v3"
+
+ "github.com/algorand/conduit/conduit/data"
+)
+
// Metadata returns fields relevant to identification and description of plugins.
type Metadata struct {
Name string
@@ -13,11 +23,31 @@ type PluginType string
const (
// Exporter PluginType
- Exporter = "exporter"
+ Exporter PluginType = "exporter"
// Processor PluginType
- Processor = "processor"
+ Processor PluginType = "processor"
// Importer PluginType
- Importer = "importer"
+ Importer PluginType = "importer"
)
+
+// GetConfig creates an appropriate plugin config for the type.
+func (pt PluginType) GetConfig(cfg data.NameConfigPair, dataDir string) (PluginConfig, error) {
+ configs, err := yaml.Marshal(cfg.Config)
+ if err != nil {
+ return PluginConfig{}, fmt.Errorf("GetConfig(): could not serialize config: %w", err)
+ }
+
+ var config PluginConfig
+ config.Config = string(configs)
+ if dataDir != "" {
+ config.DataDir = path.Join(dataDir, fmt.Sprintf("%s_%s", pt, cfg.Name))
+ err = os.MkdirAll(config.DataDir, os.ModePerm)
+ if err != nil {
+ return PluginConfig{}, fmt.Errorf("GetConfig: unable to create plugin data directory: %w", err)
+ }
+ }
+
+ return config, nil
+}