diff --git a/blocks/solana-parquet/proto/solana.rawblocks.proto b/blocks/solana-parquet/proto/solana.rawblocks.proto index a16c5c7..2c090be 100644 --- a/blocks/solana-parquet/proto/solana.rawblocks.proto +++ b/blocks/solana-parquet/proto/solana.rawblocks.proto @@ -109,7 +109,7 @@ message InstructionCall { uint32 tx_index = 9; string tx_signer = 10; bool tx_success = 11; - string log_messages = 12; + repeated string log_messages = 12; // instruction uint32 outer_instruction_index = 13; @@ -120,7 +120,8 @@ message InstructionCall { bool is_inner = 18; string data = 19; repeated string account_arguments = 20; - repeated string inner_instructions = 21; + // string representing a nested array. Switch to nested array type when supported by sink-files + string inner_instructions = 21; } message AccountActivity { diff --git a/blocks/solana-parquet/src/instruction_calls.rs b/blocks/solana-parquet/src/instruction_calls.rs index b04bb93..f3ce880 100644 --- a/blocks/solana-parquet/src/instruction_calls.rs +++ b/blocks/solana-parquet/src/instruction_calls.rs @@ -11,69 +11,101 @@ use substreams_solana::{ use crate::{ blocks::insert_blockinfo, keys::{inner_instruction_keys, instruction_keys}, + pb::solana::rawblocks::InstructionCall, + structs::{BlockInfo, BlockTimestamp}, utils::insert_timestamp_without_number, }; -pub fn insert_instruction_calls(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, transaction: &ConfirmedTransaction, tx_info: &TxInfo, table_prefix: &str) { - for (instruction_index, instruction_view) in transaction.walk_instructions().enumerate() { - if !instruction_view.is_root() { - continue; - } +pub fn collect_instruction_calls(block: &Block, timestamp: &BlockTimestamp, block_info: &BlockInfo) -> Vec { + let mut vec: Vec = vec![]; + + for (index, transaction) in block.transactions.iter().enumerate() { + let message = transaction.transaction.as_ref().expect("Transaction is missing").message.as_ref().expect("Message is missing"); + let signer = base58::encode(message.account_keys.first().expect("Signer is missing")); + + let tx_info = TxInfo { + tx_id: transaction.id().to_string(), + tx_index: index as u32, + tx_signer: signer, + tx_success: transaction.meta.as_ref().unwrap().err.is_none(), + log_messages: transaction.meta.as_ref().unwrap().log_messages.clone(), + }; + + for (instruction_index, instruction_view) in transaction.walk_instructions().enumerate() { + if !instruction_view.is_root() { + continue; + } - insert_outer_instruction(tables, clock, block, tx_info, instruction_index, &instruction_view, table_prefix); - insert_inner_instructions(tables, clock, block, tx_info, instruction_index, &instruction_view, table_prefix); + collect_outer_instruction(&mut vec, timestamp, block_info, &tx_info, instruction_index, &instruction_view); + collect_inner_instructions(&mut vec, timestamp, block_info, &tx_info, instruction_index, &instruction_view); + } } + + vec } -fn insert_outer_instruction(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView, table_prefix: &str) { +fn collect_outer_instruction(vec: &mut Vec, timestamp: &BlockTimestamp, block_info: &BlockInfo, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView) { let executing_account = base58::encode(instruction_view.program_id()); - let account_arguments = to_string_array_to_string(&instruction_view.accounts()); + // let account_arguments = to_string_array_to_string(&instruction_view.accounts()); + let account_arguments = instruction_view.accounts().iter().map(|arg| arg.to_string()).collect(); let data = bytes_to_hex(&instruction_view.data()); - let keys = instruction_keys(tx_info.tx_id, instruction_index.to_string().as_str()); - let inner_instructions_str = build_inner_instructions_str(instruction_view); - let row = tables - .push_change_composite(format!("{}instruction_calls", table_prefix), keys, 0, table_change::Operation::Create) - .change("outer_instruction_index", ("", instruction_index.to_string().as_str())) - .change("outer_executing_account", ("", executing_account.as_str())) - .change("inner_instruction_index", ("", "-1")) - .change("inner_executing_account", ("", "")) - .change("executing_account", ("", executing_account.as_str())) - .change("is_inner", ("", "false")) - .change("data", ("", data.as_str())) - .change("account_arguments", ("", account_arguments.as_str())) - .change("inner_instructions", ("", inner_instructions_str.as_str())); - - insert_timestamp_without_number(row, clock, false, true); - insert_blockinfo(row, block, true); - insert_tx_info(row, tx_info); + vec.push(InstructionCall { + block_time: Some(timestamp.time), + block_hash: timestamp.hash.clone(), + block_date: timestamp.date.clone(), + block_slot: block_info.slot, + block_height: block_info.height, + block_previous_block_hash: block_info.previous_block_hash.clone(), + block_parent_slot: block_info.parent_slot, + tx_id: tx_info.tx_id.clone(), + tx_index: tx_info.tx_index, + tx_signer: tx_info.tx_signer.to_string(), + tx_success: tx_info.tx_success, + log_messages: tx_info.log_messages.clone(), + outer_instruction_index: instruction_index as u32, + inner_instruction_index: -1, + inner_executing_account: "".to_string(), + outer_executing_account: executing_account.clone(), + executing_account, + is_inner: false, + data, + account_arguments: account_arguments, + inner_instructions: inner_instructions_str, + }); } -fn insert_inner_instructions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView, table_prefix: &str) { +fn collect_inner_instructions(vec: &mut Vec, timestamp: &BlockTimestamp, block_info: &BlockInfo, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView) { for (inner_index, inner_instruction) in instruction_view.inner_instructions().enumerate() { let inner_data = bytes_to_hex(inner_instruction.data()); let executing_account = inner_instruction.program_id().to_string(); - let account_arguments = to_string_array_to_string(&inner_instruction.accounts()); - - let keys = inner_instruction_keys(tx_info.tx_id, instruction_index.to_string().as_str(), inner_index.to_string().as_str()); - - let row = tables - .push_change_composite(format!("{}instruction_calls", table_prefix), keys, 0, table_change::Operation::Create) - .change("outer_instruction_index", ("", instruction_index.to_string().as_str())) - .change("inner_instruction_index", ("", inner_index.to_string().as_str())) - .change("inner_executing_account", ("", executing_account.as_str())) - .change("outer_executing_account", ("", instruction_view.program_id().to_string().as_str())) - .change("executing_account", ("", executing_account.as_str())) - .change("is_inner", ("", "true")) - .change("data", ("", inner_data.as_str())) - .change("account_arguments", ("", account_arguments.as_str())) - .change("inner_instructions", ("", "[]")); - - insert_timestamp_without_number(row, clock, false, true); - insert_blockinfo(row, block, true); - insert_tx_info(row, tx_info); + let account_arguments = inner_instruction.accounts().iter().map(|arg| arg.to_string()).collect(); + + vec.push(InstructionCall { + block_time: Some(timestamp.time), + block_hash: timestamp.hash.clone(), + block_date: timestamp.date.clone(), + block_slot: block_info.slot, + block_height: block_info.height, + block_previous_block_hash: block_info.previous_block_hash.clone(), + block_parent_slot: block_info.parent_slot, + tx_id: tx_info.tx_id.clone(), + tx_index: tx_info.tx_index, + tx_signer: tx_info.tx_signer.to_string(), + tx_success: tx_info.tx_success, + log_messages: tx_info.log_messages.clone(), + outer_instruction_index: instruction_index as u32, + inner_instruction_index: inner_index as i32, + inner_executing_account: executing_account.clone(), + outer_executing_account: instruction_view.program_id().to_string(), + executing_account, + is_inner: true, + data: inner_data, + account_arguments, + inner_instructions: "".to_string(), + }); } } @@ -92,18 +124,10 @@ fn build_inner_instructions_str(instruction_view: &InstructionView) -> String { serde_json::to_string(&inner_instructions).expect("Failed to serialize inner instructions") } -fn insert_tx_info(row: &mut TableChange, tx_info: &TxInfo) { - row.change("tx_id", ("", tx_info.tx_id)) - .change("tx_index", ("", tx_info.tx_index)) - .change("tx_signer", ("", tx_info.tx_signer)) - .change("tx_success", ("", tx_info.tx_success)) - .change("log_messages", ("", tx_info.log_messages)); -} - -pub struct TxInfo<'a> { - pub tx_id: &'a str, - pub tx_index: &'a str, - pub tx_signer: &'a str, - pub tx_success: &'a str, - pub log_messages: &'a str, +pub struct TxInfo { + pub tx_id: String, + pub tx_index: u32, + pub tx_signer: String, + pub tx_success: bool, + pub log_messages: Vec, } diff --git a/blocks/solana-parquet/src/pb/solana.rawblocks.rs b/blocks/solana-parquet/src/pb/solana.rawblocks.rs index 1f09106..d140ed5 100644 --- a/blocks/solana-parquet/src/pb/solana.rawblocks.rs +++ b/blocks/solana-parquet/src/pb/solana.rawblocks.rs @@ -175,8 +175,8 @@ pub struct InstructionCall { pub tx_signer: ::prost::alloc::string::String, #[prost(bool, tag="11")] pub tx_success: bool, - #[prost(string, tag="12")] - pub log_messages: ::prost::alloc::string::String, + #[prost(string, repeated, tag="12")] + pub log_messages: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// instruction #[prost(uint32, tag="13")] pub outer_instruction_index: u32, @@ -194,8 +194,9 @@ pub struct InstructionCall { pub data: ::prost::alloc::string::String, #[prost(string, repeated, tag="20")] pub account_arguments: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, - #[prost(string, repeated, tag="21")] - pub inner_instructions: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// string representing a nested array. Switch to nested array type when supported by sink-files + #[prost(string, tag="21")] + pub inner_instructions: ::prost::alloc::string::String, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] diff --git a/blocks/solana-parquet/src/sinks.rs b/blocks/solana-parquet/src/sinks.rs index 9eb6798..9cdb097 100644 --- a/blocks/solana-parquet/src/sinks.rs +++ b/blocks/solana-parquet/src/sinks.rs @@ -1,10 +1,10 @@ use substreams::errors::Error; use substreams::pb::substreams::Clock; -use substreams_database_change::pb::database::DatabaseChanges; use substreams_solana::b58; use substreams_solana::pb::sf::solana::r#type::v1::{Block, ConfirmedTransaction}; use crate::blocks::{collect_block, get_block_info}; +use crate::instruction_calls::collect_instruction_calls; use crate::pb::solana::rawblocks::Events; use crate::rewards::collect_rewards; use crate::transactions::collect_transactions; @@ -28,10 +28,10 @@ pub fn ch_out_without_votes(clock: Clock, block: Block) -> Result blocks: vec![collect_block(&block, ×tamp, &block_info).unwrap()], rewards: collect_rewards(&block, ×tamp, &block_info), transactions: collect_transactions(&non_vote_trx, &block_info, ×tamp), - instruction_calls: vec![], + instruction_calls: collect_instruction_calls(&block, ×tamp, &block_info), account_activity: vec![], vote_transactions: collect_transactions(&vote_trx, &block_info, ×tamp), - vote_instruction_calls: vec![], + vote_instruction_calls: collect_instruction_calls(&block, ×tamp, &block_info), vote_account_activity: vec![], }) } diff --git a/blocks/solana-parquet/src/transactions.rs b/blocks/solana-parquet/src/transactions.rs index 814c488..5da8bca 100644 --- a/blocks/solana-parquet/src/transactions.rs +++ b/blocks/solana-parquet/src/transactions.rs @@ -1,84 +1,13 @@ -use common::utils::{number_array_to_string, string_array_to_string, string_array_to_string_with_escapes}; -use substreams::pb::substreams::Clock; -use substreams_database_change::pb::database::{table_change, DatabaseChanges}; -use substreams_solana::{ - base58, - pb::sf::solana::r#type::v1::{Block, ConfirmedTransaction}, -}; +use common::utils::string_array_to_string_with_escapes; +use substreams_solana::{base58, pb::sf::solana::r#type::v1::ConfirmedTransaction}; use crate::{ - blocks::insert_blockinfo, - instruction_calls::{insert_instruction_calls, TxInfo}, pb::solana::rawblocks::Transaction as RawTransaction, structs::{BlockInfo, BlockTimestamp}, tx_errors::TransactionErrorDecoder, - utils::{build_csv_string, encode_byte_vectors_to_base58_string, get_account_keys_extended, insert_timestamp_without_number}, + utils::get_account_keys_extended, }; -pub fn insert_transactions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, transactions: &Vec<(usize, &ConfirmedTransaction)>, table_prefix: &str) { - for (index, transaction) in transactions { - let meta = transaction.meta.as_ref().expect("Transaction meta is missing"); - let trx = transaction.transaction.as_ref().expect("Transaction is missing"); - let message = trx.message.as_ref().expect("Transaction message is missing"); - let header = message.header.as_ref().expect("Transaction header is missing"); - - let account_keys = string_array_to_string(&get_account_keys_extended(transaction)); - - let success = meta.err.is_none(); - - let err = match &meta.err { - Some(err) => decode_transaction_error(&err.err), - None => String::new(), - }; - - let signatures = encode_byte_vectors_to_base58_string(&trx.signatures); - let first_signature = transaction.id(); - - let recent_block_hash = base58::encode(&message.recent_blockhash); - let log_messages = string_array_to_string_with_escapes(&meta.log_messages); - // let log_messages = build_csv_string(&meta.log_messages); - let pre_balances = number_array_to_string(&meta.pre_balances); - let post_balances = number_array_to_string(&meta.post_balances); - let signers: Vec = message.account_keys.iter().take(trx.signatures.len()).map(|key| base58::encode(key)).collect(); - let signers_str = string_array_to_string(&signers); - let signer = signers.first().unwrap(); - let index_str = index.to_string(); - - let row = tables - .push_change(format!("{}transactions", table_prefix), &first_signature, 0, table_change::Operation::Create) - .change("id", ("", first_signature.as_str())) - .change("index", ("", index_str.as_str())) - .change("fee", ("", meta.fee.to_string().as_str())) - .change("required_signatures", ("", header.num_required_signatures.to_string().as_str())) - .change("required_signed_accounts", ("", header.num_readonly_signed_accounts.to_string().as_str())) - .change("required_unsigned_accounts", ("", header.num_readonly_unsigned_accounts.to_string().as_str())) - .change("signature", ("", first_signature.as_str())) - .change("success", ("", success.to_string().as_str())) - .change("error", ("", err.as_str())) - .change("recent_block_hash", ("", recent_block_hash.as_str())) - .change("account_keys", ("", account_keys.as_str())) - .change("log_messages", ("", log_messages.as_str())) - .change("pre_balances", ("", pre_balances.as_str())) - .change("post_balances", ("", post_balances.as_str())) - .change("signatures", ("", signatures.as_str())) - .change("signer", ("", signer.as_str())) - .change("signers", ("", signers_str.as_str())); - - insert_timestamp_without_number(row, clock, false, true); - insert_blockinfo(row, block, true); - - let tx_info = TxInfo { - tx_id: &first_signature, - tx_index: &index_str, - tx_signer: &signer, - tx_success: &success.to_string(), - log_messages: &log_messages, - }; - - insert_instruction_calls(tables, clock, block, transaction, &tx_info, table_prefix); - } -} - pub fn collect_transactions(transactions: &Vec<(usize, &ConfirmedTransaction)>, block_info: &BlockInfo, timestamp: &BlockTimestamp) -> Vec { let mut trx_vec: Vec = Vec::new();