diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 33578180..3348e333 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -141,8 +141,9 @@ checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" [[package]] name = "aptos-indexer-processor-sdk" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" dependencies = [ + "ahash", "anyhow", "aptos-indexer-transaction-stream", "aptos-protos 1.3.1 (git+https://github.com/aptos-labs/aptos-core.git?rev=5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb)", @@ -173,7 +174,7 @@ dependencies = [ [[package]] name = "aptos-indexer-processor-sdk-server-framework" version = "1.0.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" dependencies = [ "anyhow", "aptos-indexer-processor-sdk", @@ -206,10 +207,10 @@ dependencies = [ [[package]] name = "aptos-indexer-transaction-stream" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" dependencies = [ "anyhow", - "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d)", + "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b)", "aptos-protos 1.3.1 (git+https://github.com/aptos-labs/aptos-core.git?rev=5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb)", "chrono", "futures-util", @@ -234,7 +235,7 @@ dependencies = [ [[package]] name = "aptos-moving-average" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" dependencies = [ "chrono", ] @@ -2218,7 +2219,7 @@ dependencies = [ [[package]] name = "instrumented-channel" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" dependencies = [ "delegate", "derive_builder", @@ -4094,7 +4095,10 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" [[package]] name = "sample" version = "0.1.0" -source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=9ecd252ccff53023664562001dd04c2886488c0d#9ecd252ccff53023664562001dd04c2886488c0d" +source = "git+https://github.com/aptos-labs/aptos-indexer-processor-sdk.git?rev=95d76e07dd66a20a0e50a9e6c559885bff7ab52b#95d76e07dd66a20a0e50a9e6c559885bff7ab52b" +dependencies = [ + "tracing", +] [[package]] name = "schannel" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 0fc1bc96..be3a3c3e 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" } ahash = { version = "0.8.7", features = ["serde"] } anyhow = "1.0.86" -aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" } -aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" } +aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" } +aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" } aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" } aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" } aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "7246f0536d599789d7dd5e9fb776554abbe11eac" } diff --git a/rust/processor/src/processors/fungible_asset_processor.rs b/rust/processor/src/processors/fungible_asset_processor.rs index 902aa684..fdd68ff1 100644 --- a/rust/processor/src/processors/fungible_asset_processor.rs +++ b/rust/processor/src/processors/fungible_asset_processor.rs @@ -169,7 +169,7 @@ async fn insert_to_db( Ok(()) } -fn insert_fungible_asset_activities_query( +pub fn insert_fungible_asset_activities_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -186,7 +186,7 @@ fn insert_fungible_asset_activities_query( ) } -fn insert_fungible_asset_metadata_query( +pub fn insert_fungible_asset_metadata_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -222,7 +222,7 @@ fn insert_fungible_asset_metadata_query( ) } -fn insert_fungible_asset_balances_query( +pub fn insert_fungible_asset_balances_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query( ) } -fn insert_current_fungible_asset_balances_query( +pub fn insert_current_fungible_asset_balances_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -269,7 +269,7 @@ fn insert_current_fungible_asset_balances_query( ) } -fn insert_current_unified_fungible_asset_balances_v1_query( +pub fn insert_current_unified_fungible_asset_balances_v1_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -298,7 +298,7 @@ fn insert_current_unified_fungible_asset_balances_v1_query( ) } -fn insert_current_unified_fungible_asset_balances_v2_query( +pub fn insert_current_unified_fungible_asset_balances_v2_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -328,7 +328,7 @@ fn insert_current_unified_fungible_asset_balances_v2_query( ) } -fn insert_coin_supply_query( +pub fn insert_coin_supply_query( items_to_insert: Vec, ) -> ( impl QueryFragment + diesel::query_builder::QueryId + Send, @@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor { } /// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor -async fn parse_v2_coin( +pub async fn parse_v2_coin( transactions: &[Transaction], ) -> ( Vec, diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index f1054c8e..391e4645 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -114,6 +114,18 @@ bitflags! { } } +impl TableFlags { + pub fn from_set(set: &HashSet) -> Self { + let mut flags = TableFlags::empty(); + for table in set { + if let Some(flag) = TableFlags::from_name(table) { + flags |= flag; + } + } + flags + } +} + pub struct Worker { pub db_pool: ArcDbPool, pub processor_config: ProcessorConfig, diff --git a/rust/sdk-processor/src/config/indexer_processor_config.rs b/rust/sdk-processor/src/config/indexer_processor_config.rs index 361ec56b..8d5fde49 100644 --- a/rust/sdk-processor/src/config/indexer_processor_config.rs +++ b/rust/sdk-processor/src/config/indexer_processor_config.rs @@ -2,7 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use super::{db_config::DbConfig, processor_config::ProcessorConfig}; -use crate::processors::events_processor::EventsProcessor; +use crate::processors::{ + events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor, +}; use anyhow::Result; use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig; use aptos_indexer_processor_sdk_server_framework::RunnableConfig; @@ -24,6 +26,10 @@ impl RunnableConfig for IndexerProcessorConfig { let events_processor = EventsProcessor::new(self.clone()).await?; events_processor.run_processor().await }, + ProcessorConfig::FungibleAssetProcessor(_) => { + let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?; + fungible_asset_processor.run_processor().await + }, } } diff --git a/rust/sdk-processor/src/config/processor_config.rs b/rust/sdk-processor/src/config/processor_config.rs index 541458a1..f285beac 100644 --- a/rust/sdk-processor/src/config/processor_config.rs +++ b/rust/sdk-processor/src/config/processor_config.rs @@ -1,5 +1,6 @@ -use crate::processors::events_processor::EventsProcessorConfig; +use ahash::AHashMap; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; /// This enum captures the configs for all the different processors that are defined. /// @@ -34,7 +35,8 @@ use serde::{Deserialize, Serialize}; strum(serialize_all = "snake_case") )] pub enum ProcessorConfig { - EventsProcessor(EventsProcessorConfig), + EventsProcessor(DefaultProcessorConfig), + FungibleAssetProcessor(DefaultProcessorConfig), } impl ProcessorConfig { @@ -44,33 +46,23 @@ impl ProcessorConfig { self.into() } } -#[derive(Debug)] -// To ensure that the variants of ProcessorConfig and Processor line up, in the testing -// build path we derive EnumDiscriminants on this enum as well and make sure the two -// sets of variants match up in `test_processor_names_complete`. -#[cfg_attr( - test, - derive(strum::EnumDiscriminants), - strum_discriminants( - derive(strum::EnumVariantNames), - name(ProcessorDiscriminants), - strum(serialize_all = "snake_case") - ) -)] -pub enum Processor { - EventsProcessor, -} -#[cfg(test)] -mod test { - use super::*; - use strum::VariantNames; +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct DefaultProcessorConfig { + // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) + #[serde(default = "AHashMap::new")] + pub per_table_chunk_sizes: AHashMap, + // Size of channel between steps + #[serde(default = "DefaultProcessorConfig::default_channel_size")] + pub channel_size: usize, + // String vector for deprecated tables to skip db writes + #[serde(default)] + pub deprecated_tables: HashSet, +} - /// This test exists to make sure that when a new processor is added, it is added - /// to both Processor and ProcessorConfig. To make sure this passes, make sure the - /// variants are in the same order (lexicographical) and the names match. - #[test] - fn test_processor_names_complete() { - assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS); +impl DefaultProcessorConfig { + pub const fn default_channel_size() -> usize { + 10 } } diff --git a/rust/sdk-processor/src/processors/events_processor.rs b/rust/sdk-processor/src/processors/events_processor.rs index 379b0135..589eae03 100644 --- a/rust/sdk-processor/src/processors/events_processor.rs +++ b/rust/sdk-processor/src/processors/events_processor.rs @@ -13,7 +13,6 @@ use crate::{ starting_version::get_starting_version, }, }; -use ahash::AHashMap; use anyhow::Result; use aptos_indexer_processor_sdk::{ aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, @@ -21,26 +20,8 @@ use aptos_indexer_processor_sdk::{ common_steps::TransactionStreamStep, traits::IntoRunnableStep, }; -use serde::{Deserialize, Serialize}; use tracing::{debug, info}; -#[derive(Clone, Debug, Deserialize, Serialize)] -#[serde(deny_unknown_fields)] -pub struct EventsProcessorConfig { - // Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2) - #[serde(default = "AHashMap::new")] - pub per_table_chunk_sizes: AHashMap, - // Size of channel between steps - #[serde(default = "EventsProcessorConfig::default_channel_size")] - pub channel_size: usize, -} - -impl EventsProcessorConfig { - pub const fn default_channel_size() -> usize { - 10 - } -} - pub struct EventsProcessor { pub config: IndexerProcessorConfig, pub db_pool: ArcDbPool, @@ -73,7 +54,7 @@ impl EventsProcessor { pub async fn run_processor(self) -> Result<()> { let processor_name = self.config.processor_config.name(); - // (Optional) Run migrations + // Run migrations match self.config.db_config { DbConfig::PostgresConfig(ref postgres_config) => { run_migrations( @@ -84,19 +65,26 @@ impl EventsProcessor { }, } - // (Optional) Merge the starting version from config and the latest processed version from the DB + // Merge the starting version from config and the latest processed version from the DB let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; - // (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain + // Check and update the ledger chain id to ensure we're indexing the correct chain let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) .await? .get_chain_id() .await?; check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; - let ProcessorConfig::EventsProcessor(events_processor_config) = - self.config.processor_config; - let channel_size = events_processor_config.channel_size; + let processor_config = match self.config.processor_config { + ProcessorConfig::EventsProcessor(processor_config) => processor_config, + _ => { + return Err(anyhow::anyhow!( + "Invalid processor config for EventsProcessor: {:?}", + self.config.processor_config + )) + }, + }; + let channel_size = processor_config.channel_size; // Define processor steps let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { @@ -105,7 +93,7 @@ impl EventsProcessor { }) .await?; let events_extractor = EventsExtractor {}; - let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config); + let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config); let version_tracker = LatestVersionProcessedTracker::new( self.db_pool.clone(), starting_version, @@ -125,12 +113,9 @@ impl EventsProcessor { loop { match buffer_receiver.recv().await { Ok(txn_context) => { - if txn_context.data.is_empty() { - continue; - } debug!( "Finished processing events from versions [{:?}, {:?}]", - txn_context.start_version, txn_context.end_version, + txn_context.metadata.start_version, txn_context.metadata.end_version, ); }, Err(e) => { diff --git a/rust/sdk-processor/src/processors/fungible_asset_processor.rs b/rust/sdk-processor/src/processors/fungible_asset_processor.rs new file mode 100644 index 00000000..251766a0 --- /dev/null +++ b/rust/sdk-processor/src/processors/fungible_asset_processor.rs @@ -0,0 +1,132 @@ +use crate::{ + config::{ + db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig, + processor_config::ProcessorConfig, + }, + steps::{ + common::latest_processed_version_tracker::LatestVersionProcessedTracker, + fungible_asset_processor::{ + fungible_asset_extractor::FungibleAssetExtractor, + fungible_asset_storer::FungibleAssetStorer, + }, + }, + utils::{ + chain_id::check_or_update_chain_id, + database::{new_db_pool, run_migrations, ArcDbPool}, + starting_version::get_starting_version, + }, +}; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig}, + builder::ProcessorBuilder, + common_steps::TransactionStreamStep, + traits::IntoRunnableStep, +}; +use processor::worker::TableFlags; +use tracing::{debug, info}; + +pub struct FungibleAssetProcessor { + pub config: IndexerProcessorConfig, + pub db_pool: ArcDbPool, +} + +impl FungibleAssetProcessor { + pub async fn new(config: IndexerProcessorConfig) -> Result { + match config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + let conn_pool = new_db_pool( + &postgres_config.connection_string, + Some(postgres_config.db_pool_size), + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "Failed to create connection pool for PostgresConfig: {:?}", + e + ) + })?; + + Ok(Self { + config, + db_pool: conn_pool, + }) + }, + } + } + + pub async fn run_processor(self) -> Result<()> { + let processor_name = self.config.processor_config.name(); + + // Run migrations + match self.config.db_config { + DbConfig::PostgresConfig(ref postgres_config) => { + run_migrations( + postgres_config.connection_string.clone(), + self.db_pool.clone(), + ) + .await; + }, + } + + // Merge the starting version from config and the latest processed version from the DB + let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?; + + // Check and update the ledger chain id to ensure we're indexing the correct chain + let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone()) + .await? + .get_chain_id() + .await?; + check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?; + + let processor_config = match self.config.processor_config { + ProcessorConfig::FungibleAssetProcessor(processor_config) => processor_config, + _ => return Err(anyhow::anyhow!("Processor config is wrong type")), + }; + let channel_size = processor_config.channel_size; + let deprecated_table_flags = TableFlags::from_set(&processor_config.deprecated_tables); + + // Define processor steps + let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig { + starting_version: Some(starting_version), + ..self.config.transaction_stream_config + }) + .await?; + let fa_extractor = FungibleAssetExtractor {}; + let fa_storer = FungibleAssetStorer::new( + self.db_pool.clone(), + processor_config, + deprecated_table_flags, + ); + let version_tracker = LatestVersionProcessedTracker::new( + self.db_pool.clone(), + starting_version, + processor_name.to_string(), + ); + + // Connect processor steps together + let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step( + transaction_stream.into_runnable_step(), + ) + .connect_to(fa_extractor.into_runnable_step(), channel_size) + .connect_to(fa_storer.into_runnable_step(), channel_size) + .connect_to(version_tracker.into_runnable_step(), channel_size) + .end_and_return_output_receiver(channel_size); + + // (Optional) Parse the results + loop { + match buffer_receiver.recv().await { + Ok(txn_context) => { + debug!( + "Finished processing versions [{:?}, {:?}]", + txn_context.metadata.start_version, txn_context.metadata.end_version, + ); + }, + Err(e) => { + info!("No more transactions in channel: {:?}", e); + break Ok(()); + }, + } + } + } +} diff --git a/rust/sdk-processor/src/processors/mod.rs b/rust/sdk-processor/src/processors/mod.rs index 110ce858..29d570b2 100644 --- a/rust/sdk-processor/src/processors/mod.rs +++ b/rust/sdk-processor/src/processors/mod.rs @@ -1 +1,2 @@ pub mod events_processor; +pub mod fungible_asset_processor; diff --git a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs index 54b7e7b7..2dc94991 100644 --- a/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs +++ b/rust/sdk-processor/src/steps/common/latest_processed_version_tracker.rs @@ -11,6 +11,7 @@ use aptos_indexer_processor_sdk::{ use async_trait::async_trait; use diesel::{upsert::excluded, ExpressionMethods}; use processor::{db::common::models::processor_status::ProcessorStatus, schema::processor_status}; +use std::marker::PhantomData; use tracing::info; const UPDATE_PROCESSOR_STATUS_SECS: u64 = 1; @@ -25,9 +26,10 @@ where // Next version to process that we expect. next_version: u64, // Last successful batch of sequentially processed transactions. Includes metadata to write to storage. - last_success_batch: Option>, + last_success_batch: Option>, // Tracks all the versions that have been processed out of order. - seen_versions: AHashMap>, + seen_versions: AHashMap>, + _marker: PhantomData, } impl LatestVersionProcessedTracker @@ -42,17 +44,20 @@ where next_version: starting_version, last_success_batch: None, seen_versions: AHashMap::new(), + _marker: PhantomData, } } - fn update_last_success_batch(&mut self, current_batch: TransactionContext) { + fn update_last_success_batch(&mut self, current_batch: TransactionContext<()>) { let mut new_prev_batch = current_batch; // While there are batches in seen_versions that are in order, update the new_prev_batch to the next batch. - while let Some(next_version) = self.seen_versions.remove(&(new_prev_batch.end_version + 1)) + while let Some(next_version) = self + .seen_versions + .remove(&(new_prev_batch.metadata.end_version + 1)) { new_prev_batch = next_version; } - self.next_version = new_prev_batch.end_version + 1; + self.next_version = new_prev_batch.metadata.end_version + 1; self.last_success_batch = Some(new_prev_batch); } @@ -60,13 +65,14 @@ where // Update the processor status if let Some(last_success_batch) = self.last_success_batch.as_ref() { let end_timestamp = last_success_batch + .metadata .end_transaction_timestamp .as_ref() - .map(|t| parse_timestamp(t, last_success_batch.end_version as i64)) + .map(|t| parse_timestamp(t, last_success_batch.metadata.end_version as i64)) .map(|t| t.naive_utc()); let status = ProcessorStatus { processor: self.tracker_name.clone(), - last_success_version: last_success_batch.end_version as i64, + last_success_version: last_success_batch.metadata.end_version as i64, last_transaction_timestamp: end_timestamp, }; execute_with_better_error( @@ -115,32 +121,24 @@ where // ); // If there's a gap in the next_version and current_version, save the current_version to seen_versions for // later processing. - if self.next_version != current_batch.start_version { + if self.next_version != current_batch.metadata.start_version { info!( expected_next_version = self.next_version, step = self.name(), - batch_version = current_batch.start_version, + batch_version = current_batch.metadata.start_version, "Gap detected", ); self.seen_versions - .insert(current_batch.start_version, TransactionContext { - data: vec![], // No data is needed for tracking. This is to avoid clone. - start_version: current_batch.start_version, - end_version: current_batch.end_version, - start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), - end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), - total_size_in_bytes: current_batch.total_size_in_bytes, + .insert(current_batch.metadata.start_version, TransactionContext { + data: (), // No data is needed for tracking. + metadata: current_batch.metadata.clone(), }); } else { // info!("No gap detected"); // If the current_batch is the next expected version, update the last success batch self.update_last_success_batch(TransactionContext { - data: vec![], // No data is needed for tracking. This is to avoid clone. - start_version: current_batch.start_version, - end_version: current_batch.end_version, - start_transaction_timestamp: current_batch.start_transaction_timestamp.clone(), - end_transaction_timestamp: current_batch.end_transaction_timestamp.clone(), - total_size_in_bytes: current_batch.total_size_in_bytes, + data: (), // No data is needed for tracking. + metadata: current_batch.metadata.clone(), }); } // Pass through diff --git a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs index 7aec68b2..442148c7 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_extractor.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_extractor.rs @@ -17,14 +17,14 @@ where #[async_trait] impl Processable for EventsExtractor { - type Input = Transaction; - type Output = EventModel; + type Input = Vec; + type Output = Vec; type RunType = AsyncRunType; async fn process( &mut self, - item: TransactionContext, - ) -> Result>, ProcessorError> { + item: TransactionContext>, + ) -> Result>>, ProcessorError> { // info!( // start_version = item.start_version, // end_version = item.end_version, @@ -68,11 +68,7 @@ impl Processable for EventsExtractor { .collect::>(); Ok(Some(TransactionContext { data: events, - start_version: item.start_version, - end_version: item.end_version, - start_transaction_timestamp: item.start_transaction_timestamp, - end_transaction_timestamp: item.end_transaction_timestamp, - total_size_in_bytes: item.total_size_in_bytes, + metadata: item.metadata, })) } } diff --git a/rust/sdk-processor/src/steps/events_processor/events_storer.rs b/rust/sdk-processor/src/steps/events_processor/events_storer.rs index c5560155..f1a09e95 100644 --- a/rust/sdk-processor/src/steps/events_processor/events_storer.rs +++ b/rust/sdk-processor/src/steps/events_processor/events_storer.rs @@ -1,6 +1,6 @@ use crate::{ + config::processor_config::DefaultProcessorConfig, db::common::models::events_models::events::EventModel, - processors::events_processor::EventsProcessorConfig, utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, }; use ahash::AHashMap; @@ -24,11 +24,11 @@ where Self: Sized + Send + 'static, { conn_pool: ArcDbPool, - processor_config: EventsProcessorConfig, + processor_config: DefaultProcessorConfig, } impl EventsStorer { - pub fn new(conn_pool: ArcDbPool, processor_config: EventsProcessorConfig) -> Self { + pub fn new(conn_pool: ArcDbPool, processor_config: DefaultProcessorConfig) -> Self { Self { conn_pool, processor_config, @@ -58,14 +58,14 @@ fn insert_events_query( #[async_trait] impl Processable for EventsStorer { - type Input = EventModel; - type Output = EventModel; + type Input = Vec; + type Output = (); type RunType = AsyncRunType; async fn process( &mut self, - events: TransactionContext, - ) -> Result>, ProcessorError> { + events: TransactionContext>, + ) -> Result>, ProcessorError> { // tracing::info!( // start_version = events.start_version, // end_version = events.end_version, @@ -85,14 +85,17 @@ impl Processable for EventsStorer { Ok(_) => { debug!( "Events version [{}, {}] stored successfully", - events.start_version, events.end_version + events.metadata.start_version, events.metadata.end_version ); - Ok(Some(events)) + Ok(Some(TransactionContext { + data: (), + metadata: events.metadata, + })) }, Err(e) => Err(ProcessorError::DBStoreError { message: format!( "Failed to store events versions {} to {}: {:?}", - events.start_version, events.end_version, e, + events.metadata.start_version, events.metadata.end_version, e, ), // TODO: fix it with a debug_query. query: None, diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs new file mode 100644 index 00000000..8a3d1b61 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_extractor.rs @@ -0,0 +1,85 @@ +use aptos_indexer_processor_sdk::{ + aptos_protos::transaction::v1::Transaction, + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, + FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, + }, + processors::fungible_asset_processor::parse_v2_coin, +}; + +pub struct FungibleAssetExtractor +where + Self: Sized + Send + 'static, {} + +#[async_trait] +impl Processable for FungibleAssetExtractor { + type Input = Vec; + type Output = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type RunType = AsyncRunType; + + async fn process( + &mut self, + transactions: TransactionContext>, + ) -> Result< + Option< + TransactionContext<( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + )>, + >, + ProcessorError, + > { + let ( + fungible_asset_activities, + fungible_asset_metadata, + fungible_asset_balances, + current_fungible_asset_balances, + current_unified_fungible_asset_balances, + coin_supply, + ) = parse_v2_coin(&transactions.data).await; + + Ok(Some(TransactionContext { + data: ( + fungible_asset_activities, + fungible_asset_metadata, + fungible_asset_balances, + current_fungible_asset_balances, + current_unified_fungible_asset_balances, + coin_supply, + ), + metadata: transactions.metadata, + })) + } +} + +impl AsyncStep for FungibleAssetExtractor {} + +impl NamedStep for FungibleAssetExtractor { + fn name(&self) -> String { + "FungibleAssetExtractor".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs new file mode 100644 index 00000000..0431e493 --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/fungible_asset_storer.rs @@ -0,0 +1,219 @@ +use crate::{ + config::processor_config::DefaultProcessorConfig, + utils::database::{execute_in_chunks, get_config_table_chunk_size, ArcDbPool}, +}; +use ahash::AHashMap; +use anyhow::Result; +use aptos_indexer_processor_sdk::{ + traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable}, + types::transaction_context::TransactionContext, + utils::errors::ProcessorError, +}; +use async_trait::async_trait; +use processor::{ + db::common::models::{ + coin_models::coin_supply::CoinSupply, + fungible_asset_models::{ + v2_fungible_asset_activities::FungibleAssetActivity, + v2_fungible_asset_balances::{ + CurrentFungibleAssetBalance, CurrentUnifiedFungibleAssetBalance, + FungibleAssetBalance, + }, + v2_fungible_metadata::FungibleAssetMetadataModel, + }, + }, + processors::fungible_asset_processor::{ + insert_coin_supply_query, insert_current_fungible_asset_balances_query, + insert_current_unified_fungible_asset_balances_v1_query, + insert_current_unified_fungible_asset_balances_v2_query, + insert_fungible_asset_activities_query, insert_fungible_asset_balances_query, + insert_fungible_asset_metadata_query, + }, + worker::TableFlags, +}; + +pub struct FungibleAssetStorer +where + Self: Sized + Send + 'static, +{ + conn_pool: ArcDbPool, + processor_config: DefaultProcessorConfig, + deprecated_tables: TableFlags, +} + +impl FungibleAssetStorer { + pub fn new( + conn_pool: ArcDbPool, + processor_config: DefaultProcessorConfig, + deprecated_tables: TableFlags, + ) -> Self { + Self { + conn_pool, + processor_config, + deprecated_tables, + } + } +} + +#[async_trait] +impl Processable for FungibleAssetStorer { + type Input = ( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + ); + type Output = (); + type RunType = AsyncRunType; + + async fn process( + &mut self, + input: TransactionContext<( + Vec, + Vec, + Vec, + Vec, + Vec, + Vec, + )>, + ) -> Result>, ProcessorError> { + let ( + fungible_asset_activities, + fungible_asset_metadata, + mut fungible_asset_balances, + mut current_fungible_asset_balances, + current_unified_fungible_asset_balances, + mut coin_supply, + ) = input.data; + + let per_table_chunk_sizes: AHashMap = + self.processor_config.per_table_chunk_sizes.clone(); + // if flag turned on we need to not include any value in the table + let (unified_coin_balances, unified_fa_balances): (Vec<_>, Vec<_>) = if self + .deprecated_tables + .contains(TableFlags::CURRENT_UNIFIED_FUNGIBLE_ASSET_BALANCES) + { + (vec![], vec![]) + } else { + // Basically we need to split the current unified balances into v1 and v2 + // by looking at whether asset_type_v2 is null (must be v1 if it's null) + // Note, we can't check asset_type_v1 is none because we're now filling asset_type_v1 + // for certain assets + current_unified_fungible_asset_balances + .into_iter() + .partition(|x| x.asset_type_v2.is_none()) + }; + + if self + .deprecated_tables + .contains(TableFlags::FUNGIBLE_ASSET_BALANCES) + { + fungible_asset_balances.clear(); + } + + if self + .deprecated_tables + .contains(TableFlags::CURRENT_FUNGIBLE_ASSET_BALANCES) + { + current_fungible_asset_balances.clear(); + } + + if self.deprecated_tables.contains(TableFlags::COIN_SUPPLY) { + coin_supply.clear(); + } + + let faa = execute_in_chunks( + self.conn_pool.clone(), + insert_fungible_asset_activities_query, + &fungible_asset_activities, + get_config_table_chunk_size::( + "fungible_asset_activities", + &per_table_chunk_sizes, + ), + ); + let fam = execute_in_chunks( + self.conn_pool.clone(), + insert_fungible_asset_metadata_query, + &fungible_asset_metadata, + get_config_table_chunk_size::( + "fungible_asset_metadata", + &per_table_chunk_sizes, + ), + ); + let fab = execute_in_chunks( + self.conn_pool.clone(), + insert_fungible_asset_balances_query, + &fungible_asset_balances, + get_config_table_chunk_size::( + "fungible_asset_balances", + &per_table_chunk_sizes, + ), + ); + let cfab = execute_in_chunks( + self.conn_pool.clone(), + insert_current_fungible_asset_balances_query, + ¤t_fungible_asset_balances, + get_config_table_chunk_size::( + "current_fungible_asset_balances", + &per_table_chunk_sizes, + ), + ); + let cufab_v1 = execute_in_chunks( + self.conn_pool.clone(), + insert_current_unified_fungible_asset_balances_v1_query, + &unified_coin_balances, + get_config_table_chunk_size::( + "current_unified_fungible_asset_balances", + &per_table_chunk_sizes, + ), + ); + let cufab_v2 = execute_in_chunks( + self.conn_pool.clone(), + insert_current_unified_fungible_asset_balances_v2_query, + &unified_fa_balances, + get_config_table_chunk_size::( + "current_unified_fungible_asset_balances", + &per_table_chunk_sizes, + ), + ); + let cs = execute_in_chunks( + self.conn_pool.clone(), + insert_coin_supply_query, + &coin_supply, + get_config_table_chunk_size::("coin_supply", &per_table_chunk_sizes), + ); + let (faa_res, fam_res, fab_res, cfab_res, cufab1_res, cufab2_res, cs_res) = + tokio::join!(faa, fam, fab, cfab, cufab_v1, cufab_v2, cs); + for res in [ + faa_res, fam_res, fab_res, cfab_res, cufab1_res, cufab2_res, cs_res, + ] { + match res { + Ok(_) => {}, + Err(e) => { + return Err(ProcessorError::DBStoreError { + message: format!( + "Failed to store versions {} to {}: {:?}", + input.metadata.start_version, input.metadata.end_version, e, + ), + query: None, + }) + }, + } + } + + Ok(Some(TransactionContext { + data: (), + metadata: input.metadata, + })) + } +} + +impl AsyncStep for FungibleAssetStorer {} + +impl NamedStep for FungibleAssetStorer { + fn name(&self) -> String { + "FungibleAssetStorer".to_string() + } +} diff --git a/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs b/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs new file mode 100644 index 00000000..90fb317c --- /dev/null +++ b/rust/sdk-processor/src/steps/fungible_asset_processor/mod.rs @@ -0,0 +1,2 @@ +pub mod fungible_asset_extractor; +pub mod fungible_asset_storer; diff --git a/rust/sdk-processor/src/steps/mod.rs b/rust/sdk-processor/src/steps/mod.rs index 0d9f7cf7..3c039e31 100644 --- a/rust/sdk-processor/src/steps/mod.rs +++ b/rust/sdk-processor/src/steps/mod.rs @@ -1,2 +1,3 @@ pub mod common; pub mod events_processor; +pub mod fungible_asset_processor;