From e2a8e10983a278c6e09fef8f9727356427638616 Mon Sep 17 00:00:00 2001 From: Eugene Gostkin Date: Tue, 7 Nov 2023 01:34:03 +0300 Subject: [PATCH] Fix search + rebuild speed --- .../src/multiera/multiera_projected_nft.rs | 110 +++++++++++------- 1 file changed, 68 insertions(+), 42 deletions(-) diff --git a/indexer/tasks/src/multiera/multiera_projected_nft.rs b/indexer/tasks/src/multiera/multiera_projected_nft.rs index cc099ced..f93067c8 100644 --- a/indexer/tasks/src/multiera/multiera_projected_nft.rs +++ b/indexer/tasks/src/multiera/multiera_projected_nft.rs @@ -4,6 +4,7 @@ use pallas::ledger::primitives::babbage::DatumOption; use pallas::ledger::primitives::Fragment; use pallas::ledger::traverse::{Asset, MultiEraOutput}; use projected_nft_sdk::{State, Status}; +use sea_orm::{FromQueryResult, JoinType, QuerySelect, QueryTrait}; use std::collections::{BTreeSet, HashMap}; use crate::config::ReadonlyConfig::ReadonlyConfig; @@ -30,8 +31,8 @@ carp_task! { configuration PayloadAndReadonlyConfig; doc "Parses projected NFT contract data"; era multiera; - dependencies [MultieraOutputTask, MultieraUsedInputTask]; - read [multiera_txs, multiera_outputs, multiera_used_inputs_to_outputs_map]; + dependencies [MultieraOutputTask]; + read [multiera_txs, multiera_outputs]; write []; should_add_task |block, _properties| { !block.1.is_empty() @@ -41,7 +42,6 @@ carp_task! { task.block, &previous_data.multiera_txs, &previous_data.multiera_outputs, - &previous_data.multiera_used_inputs_to_outputs_map, task.config.address.clone(), ); merge_result |previous_data, _result| { @@ -87,12 +87,22 @@ impl TryFrom for ProjectedNftOperation { } } +#[derive(FromQueryResult)] +pub(crate) struct ProjectedNftInputsQueryOutputResult { + id: i64, + tx_id: i64, + output_index: i32, + tx_hash: Vec, + operation: i32, + pub asset: String, + pub amount: i64, +} + async fn handle_projected_nft( db_tx: &DatabaseTransaction, block: BlockInfo<'_, MultiEraBlock<'_>, BlockGlobalInfo>, multiera_txs: &[TransactionModel], multiera_outputs: &[TransactionOutputModel], - multiera_used_inputs_to_outputs_map: &BTreeMap, BTreeMap>, address: String, ) -> Result<(), DbErr> { let config_address = hex::decode(address).map_err(|err| { @@ -101,6 +111,7 @@ async fn handle_projected_nft( err )) })?; + let config_address = cardano_multiplatform_lib::address::Address::from_bytes(config_address) .map_err(|err| DbErr::Custom(format!("cml can't parse config address: {:?}", err)))?; let config_payment_cred = match config_address.payment_cred() { @@ -112,6 +123,10 @@ async fn handle_projected_nft( Some(pk) => pk, }; + let used_projected_nfts = get_projected_nft_inputs(db_tx, &block).await?; + + let mut queued_projected_nft_records = vec![]; + for (tx_body, cardano_transaction) in block.1.txs().iter().zip(multiera_txs) { let mut outputs_map = HashMap::new(); for output_model in multiera_outputs @@ -121,40 +136,9 @@ async fn handle_projected_nft( outputs_map.insert(output_model.output_index, output_model.clone()); } - let mut potential_projected_nft_inputs = vec![]; - for input in tx_body.inputs() { - let output_for_input = multiera_used_inputs_to_outputs_map - .get(&input.hash().to_vec()) - .ok_or(DbErr::Custom(format!( - "can't find input: {}@{}", - input.hash().clone(), - input.index() - )))? - .get(&(input.index() as i64)) - .ok_or(DbErr::Custom(format!( - "can't find input: {}@{}", - input.hash().clone(), - input.index() - )))? - .clone(); - potential_projected_nft_inputs.push(output_for_input.model.id); - } - - let seen_projected_nfts = ProjectedNft::find() - .filter( - potential_projected_nft_inputs - .into_iter() - .fold(Condition::any(), |cond, addition| { - cond.add(ProjectedNftColumn::UtxoId.eq(addition)) - }), - ) - .all(db_tx) - .await?; - - let mut queued_projected_nft_records = vec![]; - - for projected_nft in seen_projected_nfts.iter().filter(|projected_nft| { + for projected_nft in used_projected_nfts.iter().filter(|projected_nft| { projected_nft.operation == i32::from(ProjectedNftOperation::Unlocking) + && projected_nft.tx_id == cardano_transaction.id }) { queued_projected_nft_records.push(entity::projected_nft::ActiveModel { utxo_id: Set(None), @@ -210,17 +194,59 @@ async fn handle_projected_nft( }); } } + } - if !queued_projected_nft_records.is_empty() { - ProjectedNft::insert_many(queued_projected_nft_records) - .exec(db_tx) - .await?; - } + if !queued_projected_nft_records.is_empty() { + ProjectedNft::insert_many(queued_projected_nft_records) + .exec(db_tx) + .await?; } Ok(()) } +async fn get_projected_nft_inputs( + db_tx: &DatabaseTransaction, + block: &BlockInfo<'_, MultiEraBlock<'_>, BlockGlobalInfo>, +) -> Result, DbErr> { + let inputs_condition = block + .1 + .txs() + .iter() + .flat_map(|tx| { + tx.inputs() + .iter() + .map(|input| (input.hash().to_vec(), input.index())) + .collect::, u64)>>() + }) + .fold(Condition::any(), |cond, new_input| { + cond.add( + Condition::all() + .add(TransactionOutputColumn::OutputIndex.eq(new_input.1)) + .add(TransactionColumn::Hash.eq(new_input.0)), + ) + }); + + ProjectedNft::find() + .select_only() + .column(TransactionOutputColumn::Id) + .column(TransactionOutputColumn::TxId) + .column(TransactionOutputColumn::OutputIndex) + .column(ProjectedNftColumn::Operation) + .column(ProjectedNftColumn::Asset) + .column(ProjectedNftColumn::Amount) + .column_as(TransactionColumn::Hash, "tx_hash") + .join( + JoinType::InnerJoin, + ProjectedNftRelation::TransactionOutput.def(), + ) + .join(JoinType::InnerJoin, ProjectedNftRelation::Transaction.def()) + .filter(inputs_condition) + .into_model::() + .all(db_tx) + .await +} + fn extract_operation_and_datum(output: &MultiEraOutput) -> (ProjectedNftOperation, Vec) { let datum_option = match output.datum() { Some(datum) => DatumOption::from(datum.clone()),