Skip to content

Commit

Permalink
Add Inputs (makes parquet sink crash somehow)
Browse files Browse the repository at this point in the history
  • Loading branch information
zolting committed Nov 22, 2024
1 parent 6cbb36a commit 5a5fa0f
Show file tree
Hide file tree
Showing 10 changed files with 314 additions and 101 deletions.
2 changes: 1 addition & 1 deletion blocks/bitcoin/src/blocks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use common::structs::BlockTimestamp;
use substreams_bitcoin::pb::btc::v1::Block;

use crate::pb::bitcoin::Block as OutputBlock;
use crate::pb::pinax::bitcoin::Block as OutputBlock;

pub fn collect_block(block: &Block, timestamp: &BlockTimestamp) -> OutputBlock {
// Get the coinbase from the first transaction
Expand Down
3 changes: 2 additions & 1 deletion blocks/bitcoin/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use common::utils::build_timestamp;
use substreams::{errors::Error, pb::substreams::Clock};
use substreams_bitcoin::pb::btc::v1::Block;

use crate::{blocks::collect_block, inputs::collect_transaction_inputs, pb::bitcoin::Events, transactions::collect_transaction};
use crate::{blocks::collect_block, inputs::collect_transaction_inputs, outputs::collect_transaction_outputs, pb::pinax::bitcoin::Events, transactions::collect_transaction};

#[substreams::handlers::map]
pub fn map_events(clock: Clock, block: Block) -> Result<Events, Error> {
Expand All @@ -18,6 +18,7 @@ pub fn map_events(clock: Clock, block: Block) -> Result<Events, Error> {
for (index, transaction) in block.tx.iter().enumerate() {
events.transactions.push(collect_transaction(transaction, &timestamp, index as u32));
events.inputs.extend(collect_transaction_inputs(transaction, &timestamp));
events.outputs.extend(collect_transaction_outputs(transaction, &timestamp));
}

Ok(events)
Expand Down
16 changes: 8 additions & 8 deletions blocks/bitcoin/src/inputs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use common::structs::BlockTimestamp;
use substreams_bitcoin::pb::btc::v1::Transaction;

use crate::pb::bitcoin::Input;
use crate::pb::pinax::bitcoin::Input;

pub fn collect_transaction_inputs(transaction: &Transaction, timestamp: &BlockTimestamp) -> Vec<Input> {
let mut inputs = Vec::new();
Expand All @@ -18,18 +18,18 @@ pub fn collect_transaction_inputs(transaction: &Transaction, timestamp: &BlockTi
index: index as u32,
spent_block_height: 0, // TODO: Need to look up from previous tx
spent_tx_id: input.txid.clone(),
spent_output_number: input.vout as u64,
spent_output_number: 0,
value: 0.0, // TODO: Need to look up from previous tx
address: String::new(), // TODO: Need to look up from previous tx
r#type: String::new(), // TODO: Need to look up from previous tx
coinbase: input.coinbase.clone(),
is_coinbase: !input.coinbase.is_empty(),
script_asm: script_sig.map(|s| s.asm.clone()).unwrap_or_default(),
script_hex: script_sig.map(|s| s.hex.clone()).unwrap_or_default(),
script_desc: String::new(), // TODO: Need to investigate how Dune generates this
script_signature_asm: script_sig.map(|s| s.asm.clone()).unwrap_or_default(),
script_signature_hex: script_sig.map(|s| s.hex.clone()).unwrap_or_default(),
sequence: input.sequence as i64,
script_asm: script_sig.map(|s| s.asm.clone()),
script_hex: script_sig.map(|s| s.hex.clone()),
script_desc: None,
script_signature_asm: script_sig.map(|s| s.asm.clone()),
script_signature_hex: script_sig.map(|s| s.hex.clone()),
sequence: input.sequence,
witness_data: input.txinwitness.clone(),
});
}
Expand Down
28 changes: 28 additions & 0 deletions blocks/bitcoin/src/outputs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use common::structs::BlockTimestamp;
use substreams_bitcoin::pb::btc::v1::Transaction;

use crate::pb::pinax::bitcoin::Output;

pub fn collect_transaction_outputs(transaction: &Transaction, timestamp: &BlockTimestamp) -> Vec<Output> {
let mut outputs = Vec::new();

for (index, output) in transaction.vout.iter().enumerate() {
let script_pub_key = output.script_pub_key.as_ref().unwrap();

outputs.push(Output {
block_time: Some(timestamp.time),
block_date: timestamp.date.clone(),
block_height: timestamp.number as u32,
block_hash: timestamp.hash.clone(),
tx_id: transaction.txid.clone(),
index: index as u32,
value: output.value,
address: script_pub_key.address.clone(),
r#type: script_pub_key.r#type.clone(),
script_asm: script_pub_key.asm.clone(),
script_hex: script_pub_key.hex.clone(),
});
}

outputs
}
150 changes: 75 additions & 75 deletions blocks/bitcoin/src/pb/bitcoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,182 +3,182 @@
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Events {
#[prost(message, repeated, tag="1")]
#[prost(message, repeated, tag = "1")]
pub blocks: ::prost::alloc::vec::Vec<Block>,
#[prost(message, repeated, tag="2")]
#[prost(message, repeated, tag = "2")]
pub transactions: ::prost::alloc::vec::Vec<Transaction>,
#[prost(message, repeated, tag="3")]
#[prost(message, repeated, tag = "3")]
pub inputs: ::prost::alloc::vec::Vec<Input>,
#[prost(message, repeated, tag="4")]
#[prost(message, repeated, tag = "4")]
pub outputs: ::prost::alloc::vec::Vec<Output>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Block {
/// clock
#[prost(message, optional, tag="1")]
#[prost(message, optional, tag = "1")]
pub time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(uint32, tag="2")]
#[prost(uint32, tag = "2")]
pub height: u32,
#[prost(string, tag="3")]
#[prost(string, tag = "3")]
pub date: ::prost::alloc::string::String,
#[prost(string, tag="4")]
#[prost(string, tag = "4")]
pub hash: ::prost::alloc::string::String,
/// block
#[prost(string, tag="5")]
#[prost(string, tag = "5")]
pub bits: ::prost::alloc::string::String,
#[prost(string, tag="6")]
#[prost(string, tag = "6")]
pub chainwork: ::prost::alloc::string::String,
#[prost(double, tag="7")]
#[prost(double, tag = "7")]
pub difficulty: f64,
#[prost(string, tag="8")]
#[prost(string, tag = "8")]
pub merkle_root: ::prost::alloc::string::String,
#[prost(uint64, tag="9")]
#[prost(uint64, tag = "9")]
pub transaction_count: u64,
#[prost(uint32, tag="10")]
#[prost(uint32, tag = "10")]
pub nonce: u32,
#[prost(string, tag="11")]
#[prost(string, tag = "11")]
pub coinbase: ::prost::alloc::string::String,
#[prost(string, tag="12")]
#[prost(string, tag = "12")]
pub previous_block_hash: ::prost::alloc::string::String,
#[prost(int32, tag="13")]
#[prost(int32, tag = "13")]
pub version: i32,
#[prost(int32, tag="14")]
#[prost(int32, tag = "14")]
pub weight: i32,
/// counters
#[prost(int32, tag="20")]
#[prost(int32, tag = "20")]
pub size: i32,
#[prost(int32, tag="21")]
#[prost(int32, tag = "21")]
pub stripped_size: i32,
#[prost(double, tag="22")]
#[prost(double, tag = "22")]
pub total_fees: f64,
#[prost(double, tag="23")]
#[prost(double, tag = "23")]
pub total_reward: f64,
#[prost(double, tag="24")]
#[prost(double, tag = "24")]
pub mint_reward: f64,
#[prost(uint32, tag="25")]
#[prost(uint32, tag = "25")]
pub total_inputs: u32,
#[prost(uint32, tag="26")]
#[prost(uint32, tag = "26")]
pub total_outputs: u32,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transaction {
/// block
#[prost(message, optional, tag="1")]
#[prost(message, optional, tag = "1")]
pub block_time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="2")]
#[prost(string, tag = "2")]
pub block_date: ::prost::alloc::string::String,
#[prost(uint32, tag="3")]
#[prost(uint32, tag = "3")]
pub block_height: u32,
#[prost(string, tag="4")]
#[prost(string, tag = "4")]
pub block_hash: ::prost::alloc::string::String,
/// transaction
#[prost(uint32, tag="5")]
#[prost(uint32, tag = "5")]
pub index: u32,
#[prost(string, tag="6")]
#[prost(string, tag = "6")]
pub id: ::prost::alloc::string::String,
#[prost(uint32, tag="7")]
#[prost(uint32, tag = "7")]
pub lock_time: u32,
#[prost(int32, tag="8")]
#[prost(int32, tag = "8")]
pub size: i32,
#[prost(int32, tag="9")]
#[prost(int32, tag = "9")]
pub virtual_size: i32,
#[prost(string, tag="10")]
#[prost(string, tag = "10")]
pub coinbase: ::prost::alloc::string::String,
#[prost(bool, tag="11")]
#[prost(bool, tag = "11")]
pub is_coinbase: bool,
#[prost(int64, tag="12")]
#[prost(int64, tag = "12")]
pub version: i64,
#[prost(int32, tag="13")]
#[prost(int32, tag = "13")]
pub input_count: i32,
#[prost(int32, tag="14")]
#[prost(int32, tag = "14")]
pub output_count: i32,
#[prost(string, repeated, tag="15")]
#[prost(string, repeated, tag = "15")]
pub input_tx_ids: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
#[prost(uint32, repeated, tag="16")]
#[prost(uint32, repeated, tag = "16")]
pub input_output_indices: ::prost::alloc::vec::Vec<u32>,
#[prost(double, repeated, tag="17")]
#[prost(double, repeated, tag = "17")]
pub output_values: ::prost::alloc::vec::Vec<f64>,
#[prost(string, tag="18")]
#[prost(string, tag = "18")]
pub hex: ::prost::alloc::string::String,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Input {
/// block
#[prost(message, optional, tag="1")]
#[prost(message, optional, tag = "1")]
pub block_time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="2")]
#[prost(string, tag = "2")]
pub block_date: ::prost::alloc::string::String,
#[prost(uint32, tag="3")]
#[prost(uint32, tag = "3")]
pub block_height: u32,
#[prost(string, tag="4")]
#[prost(string, tag = "4")]
pub block_hash: ::prost::alloc::string::String,
/// transaction
#[prost(string, tag="5")]
#[prost(string, tag = "5")]
pub tx_id: ::prost::alloc::string::String,
/// input
#[prost(uint32, tag="6")]
#[prost(uint32, tag = "6")]
pub index: u32,
#[prost(uint32, tag="7")]
#[prost(uint32, tag = "7")]
pub spent_block_height: u32,
#[prost(string, tag="8")]
#[prost(string, tag = "8")]
pub spent_tx_id: ::prost::alloc::string::String,
#[prost(uint64, tag="9")]
#[prost(uint64, tag = "9")]
pub spent_output_number: u64,
#[prost(double, tag="10")]
#[prost(double, tag = "10")]
pub value: f64,
#[prost(string, tag="11")]
#[prost(string, tag = "11")]
pub address: ::prost::alloc::string::String,
#[prost(string, tag="12")]
#[prost(string, tag = "12")]
pub r#type: ::prost::alloc::string::String,
#[prost(string, tag="13")]
#[prost(string, tag = "13")]
pub coinbase: ::prost::alloc::string::String,
#[prost(bool, tag="14")]
#[prost(bool, tag = "14")]
pub is_coinbase: bool,
#[prost(string, tag="15")]
#[prost(string, tag = "15")]
pub script_asm: ::prost::alloc::string::String,
#[prost(string, tag="16")]
#[prost(string, tag = "16")]
pub script_hex: ::prost::alloc::string::String,
#[prost(string, tag="17")]
#[prost(string, tag = "17")]
pub script_desc: ::prost::alloc::string::String,
#[prost(string, tag="18")]
#[prost(string, tag = "18")]
pub script_signature_asm: ::prost::alloc::string::String,
#[prost(string, tag="19")]
#[prost(string, tag = "19")]
pub script_signature_hex: ::prost::alloc::string::String,
#[prost(int64, tag="20")]
#[prost(int64, tag = "20")]
pub sequence: i64,
#[prost(string, repeated, tag="21")]
#[prost(string, repeated, tag = "21")]
pub witness_data: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Output {
/// block
#[prost(message, optional, tag="1")]
#[prost(message, optional, tag = "1")]
pub block_time: ::core::option::Option<::prost_types::Timestamp>,
#[prost(string, tag="2")]
#[prost(string, tag = "2")]
pub block_date: ::prost::alloc::string::String,
#[prost(uint32, tag="3")]
#[prost(uint32, tag = "3")]
pub block_height: u32,
#[prost(string, tag="4")]
#[prost(string, tag = "4")]
pub block_hash: ::prost::alloc::string::String,
/// transaction
#[prost(string, tag="5")]
#[prost(string, tag = "5")]
pub tx_id: ::prost::alloc::string::String,
/// output
#[prost(uint32, tag="6")]
#[prost(uint32, tag = "6")]
pub index: u32,
#[prost(double, tag="7")]
#[prost(double, tag = "7")]
pub value: f64,
#[prost(string, tag="8")]
#[prost(string, tag = "8")]
pub address: ::prost::alloc::string::String,
#[prost(string, tag="9")]
#[prost(string, tag = "9")]
pub r#type: ::prost::alloc::string::String,
#[prost(string, tag="10")]
#[prost(string, tag = "10")]
pub script_asm: ::prost::alloc::string::String,
#[prost(string, tag="11")]
#[prost(string, tag = "11")]
pub script_hex: ::prost::alloc::string::String,
}
// @@protoc_insertion_point(module)
12 changes: 7 additions & 5 deletions blocks/bitcoin/src/pb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// @generated
// @@protoc_insertion_point(attribute:bitcoin)
pub mod bitcoin {
include!("bitcoin.rs");
// @@protoc_insertion_point(bitcoin)
}
// @@protoc_insertion_point(attribute:parquet)
pub mod parquet {
include!("parquet.rs");
// @@protoc_insertion_point(parquet)
}
pub mod pinax {
// @@protoc_insertion_point(attribute:pinax.bitcoin)
pub mod bitcoin {
include!("pinax.bitcoin.rs");
// @@protoc_insertion_point(pinax.bitcoin)
}
}
pub mod sf {
// @@protoc_insertion_point(attribute:sf.substreams)
pub mod substreams {
Expand Down
Loading

0 comments on commit 5a5fa0f

Please sign in to comment.