Skip to content

Commit

Permalink
Migrate FA processor to SDK (#543)
Browse files Browse the repository at this point in the history
* migrate fa processor to sdk

* upgrade sdk dep

* cleanup

* comments
  • Loading branch information
rtso authored Oct 8, 2024
1 parent dc3df93 commit e4ef014
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 117 deletions.
18 changes: 11 additions & 7 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ testing-transactions = { path = "testing-transactions" }

ahash = { version = "0.8.7", features = ["serde"] }
anyhow = "1.0.86"
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "9ecd252ccff53023664562001dd04c2886488c0d" }
aptos-indexer-processor-sdk = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" }
aptos-indexer-processor-sdk-server-framework = { git = "https://github.com/aptos-labs/aptos-indexer-processor-sdk.git", rev = "95d76e07dd66a20a0e50a9e6c559885bff7ab52b" }
aptos-protos = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "5c48aee129b5a141be2792ffa3d9bd0a1a61c9cb" }
aptos-system-utils = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "202bdccff2b2d333a385ae86a4fcf23e89da9f62" }
aptos-indexer-test-transactions = { git = "https://github.com/aptos-labs/aptos-core.git", rev = "7246f0536d599789d7dd5e9fb776554abbe11eac" }
Expand Down
16 changes: 8 additions & 8 deletions rust/processor/src/processors/fungible_asset_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ async fn insert_to_db(
Ok(())
}

fn insert_fungible_asset_activities_query(
pub fn insert_fungible_asset_activities_query(
items_to_insert: Vec<FungibleAssetActivity>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -186,7 +186,7 @@ fn insert_fungible_asset_activities_query(
)
}

fn insert_fungible_asset_metadata_query(
pub fn insert_fungible_asset_metadata_query(
items_to_insert: Vec<FungibleAssetMetadataModel>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -222,7 +222,7 @@ fn insert_fungible_asset_metadata_query(
)
}

fn insert_fungible_asset_balances_query(
pub fn insert_fungible_asset_balances_query(
items_to_insert: Vec<FungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -239,7 +239,7 @@ fn insert_fungible_asset_balances_query(
)
}

fn insert_current_fungible_asset_balances_query(
pub fn insert_current_fungible_asset_balances_query(
items_to_insert: Vec<CurrentFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -269,7 +269,7 @@ fn insert_current_fungible_asset_balances_query(
)
}

fn insert_current_unified_fungible_asset_balances_v1_query(
pub fn insert_current_unified_fungible_asset_balances_v1_query(
items_to_insert: Vec<CurrentUnifiedFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -298,7 +298,7 @@ fn insert_current_unified_fungible_asset_balances_v1_query(
)
}

fn insert_current_unified_fungible_asset_balances_v2_query(
pub fn insert_current_unified_fungible_asset_balances_v2_query(
items_to_insert: Vec<CurrentUnifiedFungibleAssetBalance>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -328,7 +328,7 @@ fn insert_current_unified_fungible_asset_balances_v2_query(
)
}

fn insert_coin_supply_query(
pub fn insert_coin_supply_query(
items_to_insert: Vec<CoinSupply>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -451,7 +451,7 @@ impl ProcessorTrait for FungibleAssetProcessor {
}

/// V2 coin is called fungible assets and this flow includes all data from V1 in coin_processor
async fn parse_v2_coin(
pub async fn parse_v2_coin(
transactions: &[Transaction],
) -> (
Vec<FungibleAssetActivity>,
Expand Down
12 changes: 12 additions & 0 deletions rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ bitflags! {
}
}

impl TableFlags {
pub fn from_set(set: &HashSet<String>) -> Self {
let mut flags = TableFlags::empty();
for table in set {
if let Some(flag) = TableFlags::from_name(table) {
flags |= flag;
}
}
flags
}
}

pub struct Worker {
pub db_pool: ArcDbPool,
pub processor_config: ProcessorConfig,
Expand Down
8 changes: 7 additions & 1 deletion rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

use super::{db_config::DbConfig, processor_config::ProcessorConfig};
use crate::processors::events_processor::EventsProcessor;
use crate::processors::{
events_processor::EventsProcessor, fungible_asset_processor::FungibleAssetProcessor,
};
use anyhow::Result;
use aptos_indexer_processor_sdk::aptos_indexer_transaction_stream::TransactionStreamConfig;
use aptos_indexer_processor_sdk_server_framework::RunnableConfig;
Expand All @@ -24,6 +26,10 @@ impl RunnableConfig for IndexerProcessorConfig {
let events_processor = EventsProcessor::new(self.clone()).await?;
events_processor.run_processor().await
},
ProcessorConfig::FungibleAssetProcessor(_) => {
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
fungible_asset_processor.run_processor().await
},
}
}

Expand Down
48 changes: 20 additions & 28 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::processors::events_processor::EventsProcessorConfig;
use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

/// This enum captures the configs for all the different processors that are defined.
///
Expand Down Expand Up @@ -34,7 +35,8 @@ use serde::{Deserialize, Serialize};
strum(serialize_all = "snake_case")
)]
pub enum ProcessorConfig {
EventsProcessor(EventsProcessorConfig),
EventsProcessor(DefaultProcessorConfig),
FungibleAssetProcessor(DefaultProcessorConfig),
}

impl ProcessorConfig {
Expand All @@ -44,33 +46,23 @@ impl ProcessorConfig {
self.into()
}
}
#[derive(Debug)]
// To ensure that the variants of ProcessorConfig and Processor line up, in the testing
// build path we derive EnumDiscriminants on this enum as well and make sure the two
// sets of variants match up in `test_processor_names_complete`.
#[cfg_attr(
test,
derive(strum::EnumDiscriminants),
strum_discriminants(
derive(strum::EnumVariantNames),
name(ProcessorDiscriminants),
strum(serialize_all = "snake_case")
)
)]
pub enum Processor {
EventsProcessor,
}

#[cfg(test)]
mod test {
use super::*;
use strum::VariantNames;
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct DefaultProcessorConfig {
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Size of channel between steps
#[serde(default = "DefaultProcessorConfig::default_channel_size")]
pub channel_size: usize,
// String vector for deprecated tables to skip db writes
#[serde(default)]
pub deprecated_tables: HashSet<String>,
}

/// This test exists to make sure that when a new processor is added, it is added
/// to both Processor and ProcessorConfig. To make sure this passes, make sure the
/// variants are in the same order (lexicographical) and the names match.
#[test]
fn test_processor_names_complete() {
assert_eq!(ProcessorName::VARIANTS, ProcessorDiscriminants::VARIANTS);
impl DefaultProcessorConfig {
pub const fn default_channel_size() -> usize {
10
}
}
45 changes: 15 additions & 30 deletions rust/sdk-processor/src/processors/events_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,15 @@ use crate::{
starting_version::get_starting_version,
},
};
use ahash::AHashMap;
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::TransactionStreamStep,
traits::IntoRunnableStep,
};
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct EventsProcessorConfig {
// Number of rows to insert, per chunk, for each DB table. Default per table is ~32,768 (2**16/2)
#[serde(default = "AHashMap::new")]
pub per_table_chunk_sizes: AHashMap<String, usize>,
// Size of channel between steps
#[serde(default = "EventsProcessorConfig::default_channel_size")]
pub channel_size: usize,
}

impl EventsProcessorConfig {
pub const fn default_channel_size() -> usize {
10
}
}

pub struct EventsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
Expand Down Expand Up @@ -73,7 +54,7 @@ impl EventsProcessor {
pub async fn run_processor(self) -> Result<()> {
let processor_name = self.config.processor_config.name();

// (Optional) Run migrations
// Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
Expand All @@ -84,19 +65,26 @@ impl EventsProcessor {
},
}

// (Optional) Merge the starting version from config and the latest processed version from the DB
// Merge the starting version from config and the latest processed version from the DB
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// (Optional) Check and update the ledger chain id to ensure we're indexing the correct chain
// Check and update the ledger chain id to ensure we're indexing the correct chain
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let ProcessorConfig::EventsProcessor(events_processor_config) =
self.config.processor_config;
let channel_size = events_processor_config.channel_size;
let processor_config = match self.config.processor_config {
ProcessorConfig::EventsProcessor(processor_config) => processor_config,
_ => {
return Err(anyhow::anyhow!(
"Invalid processor config for EventsProcessor: {:?}",
self.config.processor_config
))
},
};
let channel_size = processor_config.channel_size;

// Define processor steps
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
Expand All @@ -105,7 +93,7 @@ impl EventsProcessor {
})
.await?;
let events_extractor = EventsExtractor {};
let events_storer = EventsStorer::new(self.db_pool.clone(), events_processor_config);
let events_storer = EventsStorer::new(self.db_pool.clone(), processor_config);
let version_tracker = LatestVersionProcessedTracker::new(
self.db_pool.clone(),
starting_version,
Expand All @@ -125,12 +113,9 @@ impl EventsProcessor {
loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
if txn_context.data.is_empty() {
continue;
}
debug!(
"Finished processing events from versions [{:?}, {:?}]",
txn_context.start_version, txn_context.end_version,
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
Expand Down
Loading

0 comments on commit e4ef014

Please sign in to comment.