Skip to content

Commit

Permalink
Add parquet file writing support
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Jul 24, 2024
1 parent 741ee9b commit 42e11ce
Show file tree
Hide file tree
Showing 16 changed files with 1,585 additions and 13 deletions.
28 changes: 28 additions & 0 deletions cmd/command_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"fmt"
"os"
"path/filepath"

"github.com/stellar/stellar-etl/internal/transform"
"github.com/xitongsys/parquet-go-source/local"
"github.com/xitongsys/parquet-go/writer"
)

type CloudStorage interface {
Expand Down Expand Up @@ -102,6 +106,10 @@ func exportFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.txt", start, end-1, dataType)
}

func exportParquetFilename(start, end uint32, dataType string) string {
return fmt.Sprintf("%d-%d-%s.parquet", start, end-1, dataType)
}

func deleteLocalFiles(path string) {
err := os.RemoveAll(path)
if err != nil {
Expand Down Expand Up @@ -135,3 +143,23 @@ func maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path strin
cmdLogger.Error("Unknown cloud provider")
}
}

func writeParquet(data []transform.SchemaParquet, path string, schema interface{}) {
parquetFile, err := local.NewLocalFileWriter(path)
if err != nil {
cmdLogger.Fatal("could not create parquet file: ", err)
}
defer parquetFile.Close()

writer, err := writer.NewParquetWriter(parquetFile, schema, 1)
if err != nil {
cmdLogger.Fatal("could not create parquet file writer: ", err)
}
defer writer.WriteStop()

for _, record := range data {
if err := writer.Write(record.ToParquet()); err != nil {
cmdLogger.Fatal("could not write record to parquet file: ", err)
}
}
}
10 changes: 9 additions & 1 deletion cmd/export_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var assetsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -40,6 +40,7 @@ var assetsCmd = &cobra.Command{
seenIDs := map[int64]bool{}
numFailures := 0
totalNumBytes := 0
var transformedAssets []transform.SchemaParquet
for _, transformInput := range paymentOps {
transformed, err := transform.TransformAsset(transformInput.Operation, transformInput.OperationIndex, transformInput.TransactionIndex, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta)
if err != nil {
Expand All @@ -62,6 +63,8 @@ var assetsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedAssets = append(transformedAssets, transformed)
}

outFile.Close()
Expand All @@ -70,6 +73,11 @@ var assetsCmd = &cobra.Command{
printTransformStats(len(paymentOps), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedAssets, parquetPath, new(transform.AssetOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
},
}

Expand Down
10 changes: 10 additions & 0 deletions cmd/export_contract_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var contractEventsCmd = &cobra.Command{

outFile := mustOutFile(cmdArgs.Path)
numFailures := 0
var transformedEvents []transform.SchemaParquet
for _, transformInput := range transactions {
transformed, err := transform.TransformContractEvent(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
Expand All @@ -45,14 +46,23 @@ var contractEventsCmd = &cobra.Command{
numFailures += 1
continue
}

transformedEvents = append(transformedEvents, contractEvent)
}

}

outFile.Close()

printTransformStats(len(transactions), numFailures)

maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.Path)

if commonArgs.WriteParquet {
writeParquet(transformedEvents, cmdArgs.ParquetPath, new(transform.ContractEventOutputParquet))
maybeUpload(cmdArgs.Credentials, cmdArgs.Bucket, cmdArgs.Provider, cmdArgs.ParquetPath)
}

},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_effects.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var effectsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -28,6 +28,7 @@ var effectsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedEffects []transform.SchemaParquet
for _, transformInput := range transactions {
LedgerSeq := uint32(transformInput.LedgerHistory.Header.LedgerSeq)
effects, err := transform.TransformEffect(transformInput.Transaction, LedgerSeq, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
Expand All @@ -46,6 +47,8 @@ var effectsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedEffects = append(transformedEffects, transformed)
}
}

Expand All @@ -55,6 +58,11 @@ var effectsCmd = &cobra.Command{
printTransformStats(len(transactions), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
writeParquet(transformedEffects, parquetPath, new(transform.EffectOutputParquet))
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
},
}

Expand Down
64 changes: 62 additions & 2 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ be exported.`,
cmdLogger.StrictExport = commonArgs.StrictExport
env := utils.GetEnvironmentDetails(commonArgs)

_, configPath, startNum, batchSize, outputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
_, configPath, startNum, batchSize, outputFolder, parquetOutputFolder := utils.MustCoreFlags(cmd.Flags(), cmdLogger)
exports := utils.MustExportTypeFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

Expand All @@ -43,6 +43,11 @@ be exported.`,
cmdLogger.Fatalf("unable to mkdir %s: %v", outputFolder, err)
}

err = os.MkdirAll(parquetOutputFolder, os.ModePerm)
if err != nil {
cmdLogger.Fatalf("unable to mkdir %s: %v", parquetOutputFolder, err)
}

if batchSize <= 0 {
cmdLogger.Fatalf("batch-size (%d) must be greater than 0", batchSize)
}
Expand Down Expand Up @@ -256,11 +261,13 @@ be exported.`,
batch.BatchStart,
batch.BatchEnd,
outputFolder,
parquetOutputFolder,
transformedOutputs,
cloudCredentials,
cloudStorageBucket,
cloudProvider,
commonArgs.Extra,
commonArgs.WriteParquet,
)
if err != nil {
cmdLogger.LogError(err)
Expand All @@ -274,23 +281,76 @@ be exported.`,
func exportTransformedData(
start, end uint32,
folderPath string,
parquetFolderPath string,
transformedOutput map[string][]interface{},
cloudCredentials, cloudStorageBucket, cloudProvider string,
extra map[string]string) error {
extra map[string]string,
WriteParquet bool) error {

for resource, output := range transformedOutput {
// Filenames are typically exclusive of end point. This processor
// is different and we have to increment by 1 since the end batch number
// is included in this filename.
path := filepath.Join(folderPath, exportFilename(start, end+1, resource))
parquetPath := filepath.Join(parquetFolderPath, exportParquetFilename(start, end+1, resource))
outFile := mustOutFile(path)
var transformedResource []transform.SchemaParquet
var parquetSchema interface{}
var skip bool
for _, o := range output {
_, err := exportEntry(o, outFile, extra)
if err != nil {
return err
}
switch v := o.(type) {
case transform.AccountOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountOutputParquet)
skip = false
case transform.AccountSignerOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.AccountSignerOutputParquet)
skip = false
case transform.ClaimableBalanceOutput:
// Skip
skip = true
case transform.ConfigSettingOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ConfigSettingOutputParquet)
skip = false
case transform.ContractCodeOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractCodeOutputParquet)
skip = false
case transform.ContractDataOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.ContractDataOutputParquet)
skip = false
case transform.PoolOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.PoolOutputParquet)
skip = false
case transform.OfferOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.OfferOutputParquet)
skip = false
case transform.TrustlineOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TrustlineOutputParquet)
skip = false
case transform.TtlOutput:
transformedResource = append(transformedResource, v)
parquetSchema = new(transform.TtlOutputParquet)
skip = false
}
}

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if !skip && WriteParquet {
writeParquet(transformedResource, parquetPath, parquetSchema)
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
}
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgerTransactionCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, _, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_ledgers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var ledgersCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -38,6 +38,7 @@ var ledgersCmd = &cobra.Command{

numFailures := 0
totalNumBytes := 0
var transformedLedgers []transform.SchemaParquet
for i, ledger := range ledgers {
transformed, err := transform.TransformLedger(ledger.Ledger, ledger.LCM)
if err != nil {
Expand All @@ -53,6 +54,8 @@ var ledgersCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedLedgers = append(transformedLedgers, transformed)
}

outFile.Close()
Expand All @@ -61,6 +64,11 @@ var ledgersCmd = &cobra.Command{
printTransformStats(len(ledgers), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedLedgers, parquetPath, new(transform.LedgerOutputParquet))
}
},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var operationsCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)

Expand All @@ -30,6 +30,7 @@ var operationsCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedOps []transform.SchemaParquet
for _, transformInput := range operations {
transformed, err := transform.TransformOperation(transformInput.Operation, transformInput.OperationIndex, transformInput.Transaction, transformInput.LedgerSeqNum, transformInput.LedgerCloseMeta, env.NetworkPassphrase)
if err != nil {
Expand All @@ -46,6 +47,8 @@ var operationsCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedOps = append(transformedOps, transformed)
}

outFile.Close()
Expand All @@ -54,6 +57,11 @@ var operationsCmd = &cobra.Command{
printTransformStats(len(operations), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedOps, parquetPath, new(transform.OperationOutputParquet))
}
},
}

Expand Down
10 changes: 9 additions & 1 deletion cmd/export_trades.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ var tradesCmd = &cobra.Command{
cmdLogger.SetLevel(logrus.InfoLevel)
commonArgs := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = commonArgs.StrictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
startNum, path, parquetPath, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(commonArgs)
cloudStorageBucket, cloudCredentials, cloudProvider := utils.MustCloudStorageFlags(cmd.Flags(), cmdLogger)

Expand All @@ -33,6 +33,7 @@ var tradesCmd = &cobra.Command{
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var transformedTrades []transform.SchemaParquet
for _, tradeInput := range trades {
trades, err := transform.TransformTrade(tradeInput.OperationIndex, tradeInput.OperationHistoryID, tradeInput.Transaction, tradeInput.CloseTime)
if err != nil {
Expand All @@ -50,6 +51,8 @@ var tradesCmd = &cobra.Command{
continue
}
totalNumBytes += numBytes

transformedTrades = append(transformedTrades, transformed)
}
}

Expand All @@ -59,6 +62,11 @@ var tradesCmd = &cobra.Command{
printTransformStats(len(trades), numFailures)

maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, path)

if commonArgs.WriteParquet {
maybeUpload(cloudCredentials, cloudStorageBucket, cloudProvider, parquetPath)
writeParquet(transformedTrades, parquetPath, new(transform.TradeOutputParquet))
}
},
}

Expand Down
Loading

0 comments on commit 42e11ce

Please sign in to comment.