Skip to content

Commit

Permalink
Add instruction_calls
Browse files Browse the repository at this point in the history
  • Loading branch information
zolting committed Nov 14, 2024
1 parent 48bd765 commit 033d801
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 143 deletions.
5 changes: 3 additions & 2 deletions blocks/solana-parquet/proto/solana.rawblocks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ message InstructionCall {
uint32 tx_index = 9;
string tx_signer = 10;
bool tx_success = 11;
string log_messages = 12;
repeated string log_messages = 12;

// instruction
uint32 outer_instruction_index = 13;
Expand All @@ -120,7 +120,8 @@ message InstructionCall {
bool is_inner = 18;
string data = 19;
repeated string account_arguments = 20;
repeated string inner_instructions = 21;
// string representing a nested array. Switch to nested array type when supported by sink-files
string inner_instructions = 21;
}

message AccountActivity {
Expand Down
144 changes: 84 additions & 60 deletions blocks/solana-parquet/src/instruction_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,69 +11,101 @@ use substreams_solana::{
use crate::{
blocks::insert_blockinfo,
keys::{inner_instruction_keys, instruction_keys},
pb::solana::rawblocks::InstructionCall,
structs::{BlockInfo, BlockTimestamp},
utils::insert_timestamp_without_number,
};

pub fn insert_instruction_calls(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, transaction: &ConfirmedTransaction, tx_info: &TxInfo, table_prefix: &str) {
for (instruction_index, instruction_view) in transaction.walk_instructions().enumerate() {
if !instruction_view.is_root() {
continue;
}
pub fn collect_instruction_calls(block: &Block, timestamp: &BlockTimestamp, block_info: &BlockInfo) -> Vec<InstructionCall> {
let mut vec: Vec<InstructionCall> = vec![];

for (index, transaction) in block.transactions.iter().enumerate() {
let message = transaction.transaction.as_ref().expect("Transaction is missing").message.as_ref().expect("Message is missing");
let signer = base58::encode(message.account_keys.first().expect("Signer is missing"));

let tx_info = TxInfo {
tx_id: transaction.id().to_string(),
tx_index: index as u32,
tx_signer: signer,
tx_success: transaction.meta.as_ref().unwrap().err.is_none(),
log_messages: transaction.meta.as_ref().unwrap().log_messages.clone(),
};

for (instruction_index, instruction_view) in transaction.walk_instructions().enumerate() {
if !instruction_view.is_root() {
continue;
}

insert_outer_instruction(tables, clock, block, tx_info, instruction_index, &instruction_view, table_prefix);
insert_inner_instructions(tables, clock, block, tx_info, instruction_index, &instruction_view, table_prefix);
collect_outer_instruction(&mut vec, timestamp, block_info, &tx_info, instruction_index, &instruction_view);
collect_inner_instructions(&mut vec, timestamp, block_info, &tx_info, instruction_index, &instruction_view);
}
}

vec
}

fn insert_outer_instruction(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView, table_prefix: &str) {
fn collect_outer_instruction(vec: &mut Vec<InstructionCall>, timestamp: &BlockTimestamp, block_info: &BlockInfo, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView) {
let executing_account = base58::encode(instruction_view.program_id());
let account_arguments = to_string_array_to_string(&instruction_view.accounts());
// let account_arguments = to_string_array_to_string(&instruction_view.accounts());
let account_arguments = instruction_view.accounts().iter().map(|arg| arg.to_string()).collect();
let data = bytes_to_hex(&instruction_view.data());

let keys = instruction_keys(tx_info.tx_id, instruction_index.to_string().as_str());

let inner_instructions_str = build_inner_instructions_str(instruction_view);

let row = tables
.push_change_composite(format!("{}instruction_calls", table_prefix), keys, 0, table_change::Operation::Create)
.change("outer_instruction_index", ("", instruction_index.to_string().as_str()))
.change("outer_executing_account", ("", executing_account.as_str()))
.change("inner_instruction_index", ("", "-1"))
.change("inner_executing_account", ("", ""))
.change("executing_account", ("", executing_account.as_str()))
.change("is_inner", ("", "false"))
.change("data", ("", data.as_str()))
.change("account_arguments", ("", account_arguments.as_str()))
.change("inner_instructions", ("", inner_instructions_str.as_str()));

insert_timestamp_without_number(row, clock, false, true);
insert_blockinfo(row, block, true);
insert_tx_info(row, tx_info);
vec.push(InstructionCall {
block_time: Some(timestamp.time),
block_hash: timestamp.hash.clone(),
block_date: timestamp.date.clone(),
block_slot: block_info.slot,
block_height: block_info.height,
block_previous_block_hash: block_info.previous_block_hash.clone(),
block_parent_slot: block_info.parent_slot,
tx_id: tx_info.tx_id.clone(),
tx_index: tx_info.tx_index,
tx_signer: tx_info.tx_signer.to_string(),
tx_success: tx_info.tx_success,
log_messages: tx_info.log_messages.clone(),
outer_instruction_index: instruction_index as u32,
inner_instruction_index: -1,
inner_executing_account: "".to_string(),
outer_executing_account: executing_account.clone(),
executing_account,
is_inner: false,
data,
account_arguments: account_arguments,
inner_instructions: inner_instructions_str,
});
}

fn insert_inner_instructions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView, table_prefix: &str) {
fn collect_inner_instructions(vec: &mut Vec<InstructionCall>, timestamp: &BlockTimestamp, block_info: &BlockInfo, tx_info: &TxInfo, instruction_index: usize, instruction_view: &InstructionView) {
for (inner_index, inner_instruction) in instruction_view.inner_instructions().enumerate() {
let inner_data = bytes_to_hex(inner_instruction.data());
let executing_account = inner_instruction.program_id().to_string();
let account_arguments = to_string_array_to_string(&inner_instruction.accounts());

let keys = inner_instruction_keys(tx_info.tx_id, instruction_index.to_string().as_str(), inner_index.to_string().as_str());

let row = tables
.push_change_composite(format!("{}instruction_calls", table_prefix), keys, 0, table_change::Operation::Create)
.change("outer_instruction_index", ("", instruction_index.to_string().as_str()))
.change("inner_instruction_index", ("", inner_index.to_string().as_str()))
.change("inner_executing_account", ("", executing_account.as_str()))
.change("outer_executing_account", ("", instruction_view.program_id().to_string().as_str()))
.change("executing_account", ("", executing_account.as_str()))
.change("is_inner", ("", "true"))
.change("data", ("", inner_data.as_str()))
.change("account_arguments", ("", account_arguments.as_str()))
.change("inner_instructions", ("", "[]"));

insert_timestamp_without_number(row, clock, false, true);
insert_blockinfo(row, block, true);
insert_tx_info(row, tx_info);
let account_arguments = inner_instruction.accounts().iter().map(|arg| arg.to_string()).collect();

vec.push(InstructionCall {
block_time: Some(timestamp.time),
block_hash: timestamp.hash.clone(),
block_date: timestamp.date.clone(),
block_slot: block_info.slot,
block_height: block_info.height,
block_previous_block_hash: block_info.previous_block_hash.clone(),
block_parent_slot: block_info.parent_slot,
tx_id: tx_info.tx_id.clone(),
tx_index: tx_info.tx_index,
tx_signer: tx_info.tx_signer.to_string(),
tx_success: tx_info.tx_success,
log_messages: tx_info.log_messages.clone(),
outer_instruction_index: instruction_index as u32,
inner_instruction_index: inner_index as i32,
inner_executing_account: executing_account.clone(),
outer_executing_account: instruction_view.program_id().to_string(),
executing_account,
is_inner: true,
data: inner_data,
account_arguments,
inner_instructions: "".to_string(),
});
}
}

Expand All @@ -92,18 +124,10 @@ fn build_inner_instructions_str(instruction_view: &InstructionView) -> String {
serde_json::to_string(&inner_instructions).expect("Failed to serialize inner instructions")
}

fn insert_tx_info(row: &mut TableChange, tx_info: &TxInfo) {
row.change("tx_id", ("", tx_info.tx_id))
.change("tx_index", ("", tx_info.tx_index))
.change("tx_signer", ("", tx_info.tx_signer))
.change("tx_success", ("", tx_info.tx_success))
.change("log_messages", ("", tx_info.log_messages));
}

pub struct TxInfo<'a> {
pub tx_id: &'a str,
pub tx_index: &'a str,
pub tx_signer: &'a str,
pub tx_success: &'a str,
pub log_messages: &'a str,
pub struct TxInfo {
pub tx_id: String,
pub tx_index: u32,
pub tx_signer: String,
pub tx_success: bool,
pub log_messages: Vec<String>,
}
9 changes: 5 additions & 4 deletions blocks/solana-parquet/src/pb/solana.rawblocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ pub struct InstructionCall {
pub tx_signer: ::prost::alloc::string::String,
#[prost(bool, tag="11")]
pub tx_success: bool,
#[prost(string, tag="12")]
pub log_messages: ::prost::alloc::string::String,
#[prost(string, repeated, tag="12")]
pub log_messages: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// instruction
#[prost(uint32, tag="13")]
pub outer_instruction_index: u32,
Expand All @@ -194,8 +194,9 @@ pub struct InstructionCall {
pub data: ::prost::alloc::string::String,
#[prost(string, repeated, tag="20")]
pub account_arguments: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(string, repeated, tag="21")]
pub inner_instructions: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
/// string representing a nested array. Switch to nested array type when supported by sink-files
#[prost(string, tag="21")]
pub inner_instructions: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
Expand Down
6 changes: 3 additions & 3 deletions blocks/solana-parquet/src/sinks.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use substreams::errors::Error;
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::DatabaseChanges;
use substreams_solana::b58;
use substreams_solana::pb::sf::solana::r#type::v1::{Block, ConfirmedTransaction};

use crate::blocks::{collect_block, get_block_info};
use crate::instruction_calls::collect_instruction_calls;
use crate::pb::solana::rawblocks::Events;
use crate::rewards::collect_rewards;
use crate::transactions::collect_transactions;
Expand All @@ -28,10 +28,10 @@ pub fn ch_out_without_votes(clock: Clock, block: Block) -> Result<Events, Error>
blocks: vec![collect_block(&block, &timestamp, &block_info).unwrap()],
rewards: collect_rewards(&block, &timestamp, &block_info),
transactions: collect_transactions(&non_vote_trx, &block_info, &timestamp),
instruction_calls: vec![],
instruction_calls: collect_instruction_calls(&block, &timestamp, &block_info),
account_activity: vec![],
vote_transactions: collect_transactions(&vote_trx, &block_info, &timestamp),
vote_instruction_calls: vec![],
vote_instruction_calls: collect_instruction_calls(&block, &timestamp, &block_info),
vote_account_activity: vec![],
})
}
Expand Down
77 changes: 3 additions & 74 deletions blocks/solana-parquet/src/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,13 @@
use common::utils::{number_array_to_string, string_array_to_string, string_array_to_string_with_escapes};
use substreams::pb::substreams::Clock;
use substreams_database_change::pb::database::{table_change, DatabaseChanges};
use substreams_solana::{
base58,
pb::sf::solana::r#type::v1::{Block, ConfirmedTransaction},
};
use common::utils::string_array_to_string_with_escapes;
use substreams_solana::{base58, pb::sf::solana::r#type::v1::ConfirmedTransaction};

use crate::{
blocks::insert_blockinfo,
instruction_calls::{insert_instruction_calls, TxInfo},
pb::solana::rawblocks::Transaction as RawTransaction,
structs::{BlockInfo, BlockTimestamp},
tx_errors::TransactionErrorDecoder,
utils::{build_csv_string, encode_byte_vectors_to_base58_string, get_account_keys_extended, insert_timestamp_without_number},
utils::get_account_keys_extended,
};

pub fn insert_transactions(tables: &mut DatabaseChanges, clock: &Clock, block: &Block, transactions: &Vec<(usize, &ConfirmedTransaction)>, table_prefix: &str) {
for (index, transaction) in transactions {
let meta = transaction.meta.as_ref().expect("Transaction meta is missing");
let trx = transaction.transaction.as_ref().expect("Transaction is missing");
let message = trx.message.as_ref().expect("Transaction message is missing");
let header = message.header.as_ref().expect("Transaction header is missing");

let account_keys = string_array_to_string(&get_account_keys_extended(transaction));

let success = meta.err.is_none();

let err = match &meta.err {
Some(err) => decode_transaction_error(&err.err),
None => String::new(),
};

let signatures = encode_byte_vectors_to_base58_string(&trx.signatures);
let first_signature = transaction.id();

let recent_block_hash = base58::encode(&message.recent_blockhash);
let log_messages = string_array_to_string_with_escapes(&meta.log_messages);
// let log_messages = build_csv_string(&meta.log_messages);
let pre_balances = number_array_to_string(&meta.pre_balances);
let post_balances = number_array_to_string(&meta.post_balances);
let signers: Vec<String> = message.account_keys.iter().take(trx.signatures.len()).map(|key| base58::encode(key)).collect();
let signers_str = string_array_to_string(&signers);
let signer = signers.first().unwrap();
let index_str = index.to_string();

let row = tables
.push_change(format!("{}transactions", table_prefix), &first_signature, 0, table_change::Operation::Create)
.change("id", ("", first_signature.as_str()))
.change("index", ("", index_str.as_str()))
.change("fee", ("", meta.fee.to_string().as_str()))
.change("required_signatures", ("", header.num_required_signatures.to_string().as_str()))
.change("required_signed_accounts", ("", header.num_readonly_signed_accounts.to_string().as_str()))
.change("required_unsigned_accounts", ("", header.num_readonly_unsigned_accounts.to_string().as_str()))
.change("signature", ("", first_signature.as_str()))
.change("success", ("", success.to_string().as_str()))
.change("error", ("", err.as_str()))
.change("recent_block_hash", ("", recent_block_hash.as_str()))
.change("account_keys", ("", account_keys.as_str()))
.change("log_messages", ("", log_messages.as_str()))
.change("pre_balances", ("", pre_balances.as_str()))
.change("post_balances", ("", post_balances.as_str()))
.change("signatures", ("", signatures.as_str()))
.change("signer", ("", signer.as_str()))
.change("signers", ("", signers_str.as_str()));

insert_timestamp_without_number(row, clock, false, true);
insert_blockinfo(row, block, true);

let tx_info = TxInfo {
tx_id: &first_signature,
tx_index: &index_str,
tx_signer: &signer,
tx_success: &success.to_string(),
log_messages: &log_messages,
};

insert_instruction_calls(tables, clock, block, transaction, &tx_info, table_prefix);
}
}

pub fn collect_transactions(transactions: &Vec<(usize, &ConfirmedTransaction)>, block_info: &BlockInfo, timestamp: &BlockTimestamp) -> Vec<RawTransaction> {
let mut trx_vec: Vec<RawTransaction> = Vec::new();

Expand Down

0 comments on commit 033d801

Please sign in to comment.