Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create export-tx #198

Merged
merged 7 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions cmd/export_ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cmd

import (
"fmt"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/stellar/stellar-etl/internal/input"
"github.com/stellar/stellar-etl/internal/transform"
"github.com/stellar/stellar-etl/internal/utils"
)

var ledgerTransactionCmd = &cobra.Command{
Use: "export_ledger_transaction",
Short: "Exports the ledger_transaction transaction data over a specified range.",
Long: `Exports the ledger_transaction transaction data over a specified range to an output file.`,
Run: func(cmd *cobra.Command, args []string) {
cmdLogger.SetLevel(logrus.InfoLevel)
endNum, strictExport, isTest, isFuture, extra := utils.MustCommonFlags(cmd.Flags(), cmdLogger)
cmdLogger.StrictExport = strictExport
startNum, path, limit := utils.MustArchiveFlags(cmd.Flags(), cmdLogger)
gcsBucket, gcpCredentials := utils.MustGcsFlags(cmd.Flags(), cmdLogger)
env := utils.GetEnvironmentDetails(isTest, isFuture)

ledgerTransaction, err := input.GetTransactions(startNum, endNum, limit, env)
if err != nil {
cmdLogger.Fatal("could not read ledger_transaction: ", err)
}

outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
for _, transformInput := range ledgerTransaction {
transformed, err := transform.TransformLedgerTransaction(transformInput.Transaction, transformInput.LedgerHistory)
if err != nil {
ledgerSeq := transformInput.LedgerHistory.Header.LedgerSeq
cmdLogger.LogError(fmt.Errorf("could not transform ledger_transaction transaction %d in ledger %d: ", transformInput.Transaction.Index, ledgerSeq))
numFailures += 1
continue
}

numBytes, err := exportEntry(transformed, outFile, extra)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not export transaction: %v", err))
numFailures += 1
continue
}
totalNumBytes += numBytes
}

outFile.Close()
cmdLogger.Info("Number of bytes written: ", totalNumBytes)

printTransformStats(len(ledgerTransaction), numFailures)

maybeUpload(gcpCredentials, gcsBucket, path)
},
}

func init() {
rootCmd.AddCommand(ledgerTransactionCmd)
utils.AddCommonFlags(ledgerTransactionCmd.Flags())
utils.AddArchiveFlags("ledger_transaction", ledgerTransactionCmd.Flags())
utils.AddGcsFlags(ledgerTransactionCmd.Flags())
ledgerTransactionCmd.MarkFlagRequired("end-ledger")

/*
Current flags:
start-ledger: the ledger sequence number for the beginning of the export period
end-ledger: the ledger sequence number for the end of the export range (*required)

limit: maximum number of ledger_transaction to export
TODO: measure a good default value that ensures all ledger_transaction within a 5 minute period will be exported with a single call
The current max_ledger_transaction_set_size is 1000 and there are 60 new ledgers in a 5 minute period:
1000*60 = 60000

output-file: filename of the output file

TODO: implement extra flags if possible
serialize-method: the method for serialization of the output data (JSON, XDR, etc)
start and end time as a replacement for start and end sequence numbers
*/
}
58 changes: 58 additions & 0 deletions internal/transform/ledger_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package transform

import (
"fmt"

"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

// TransformTransaction converts a transaction from the history archive ingestion system into a form suitable for BigQuery
func TransformLedgerTransaction(transaction ingest.LedgerTransaction, lhe xdr.LedgerHeaderHistoryEntry) (LedgerTransactionOutput, error) {
ledgerHeader := lhe.Header
outputLedgerSequence := uint32(ledgerHeader.LedgerSeq)

outputTxEnvelope, err := xdr.MarshalBase64(transaction.Envelope)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxResult, err := xdr.MarshalBase64(&transaction.Result)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxMeta, err := xdr.MarshalBase64(transaction.UnsafeMeta)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxFeeMeta, err := xdr.MarshalBase64(transaction.FeeChanges)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputTxLedgerHistory, err := xdr.MarshalBase64(lhe)
if err != nil {
return LedgerTransactionOutput{}, err
}

outputCloseTime, err := utils.TimePointToUTCTimeStamp(ledgerHeader.ScpValue.CloseTime)
if err != nil {
return LedgerTransactionOutput{}, fmt.Errorf("could not convert close time to UTC timestamp: %v", err)
}

transformedLedgerTransaction := LedgerTransactionOutput{
LedgerSequence: outputLedgerSequence,
TxEnvelope: outputTxEnvelope,
TxResult: outputTxResult,
TxMeta: outputTxMeta,
TxFeeMeta: outputTxFeeMeta,
TxLedgerHistory: outputTxLedgerHistory,
ClosedAt: outputCloseTime,
}

return transformedLedgerTransaction, nil
}
Loading
Loading