Skip to content

Commit

Permalink
Fix search + rebuild speed
Browse files Browse the repository at this point in the history
  • Loading branch information
gostkin committed Nov 27, 2023
1 parent 2996abd commit e2a8e10
Showing 1 changed file with 68 additions and 42 deletions.
110 changes: 68 additions & 42 deletions indexer/tasks/src/multiera/multiera_projected_nft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use pallas::ledger::primitives::babbage::DatumOption;
use pallas::ledger::primitives::Fragment;
use pallas::ledger::traverse::{Asset, MultiEraOutput};
use projected_nft_sdk::{State, Status};
use sea_orm::{FromQueryResult, JoinType, QuerySelect, QueryTrait};
use std::collections::{BTreeSet, HashMap};

use crate::config::ReadonlyConfig::ReadonlyConfig;
Expand All @@ -30,8 +31,8 @@ carp_task! {
configuration PayloadAndReadonlyConfig;
doc "Parses projected NFT contract data";
era multiera;
dependencies [MultieraOutputTask, MultieraUsedInputTask];
read [multiera_txs, multiera_outputs, multiera_used_inputs_to_outputs_map];
dependencies [MultieraOutputTask];
read [multiera_txs, multiera_outputs];
write [];
should_add_task |block, _properties| {
!block.1.is_empty()
Expand All @@ -41,7 +42,6 @@ carp_task! {
task.block,
&previous_data.multiera_txs,
&previous_data.multiera_outputs,
&previous_data.multiera_used_inputs_to_outputs_map,
task.config.address.clone(),
);
merge_result |previous_data, _result| {
Expand Down Expand Up @@ -87,12 +87,22 @@ impl TryFrom<i32> for ProjectedNftOperation {
}
}

#[derive(FromQueryResult)]
pub(crate) struct ProjectedNftInputsQueryOutputResult {
id: i64,
tx_id: i64,
output_index: i32,
tx_hash: Vec<u8>,
operation: i32,
pub asset: String,
pub amount: i64,
}

async fn handle_projected_nft(
db_tx: &DatabaseTransaction,
block: BlockInfo<'_, MultiEraBlock<'_>, BlockGlobalInfo>,
multiera_txs: &[TransactionModel],
multiera_outputs: &[TransactionOutputModel],
multiera_used_inputs_to_outputs_map: &BTreeMap<Vec<u8>, BTreeMap<i64, OutputWithTxData>>,
address: String,
) -> Result<(), DbErr> {
let config_address = hex::decode(address).map_err(|err| {
Expand All @@ -101,6 +111,7 @@ async fn handle_projected_nft(
err
))
})?;

let config_address = cardano_multiplatform_lib::address::Address::from_bytes(config_address)
.map_err(|err| DbErr::Custom(format!("cml can't parse config address: {:?}", err)))?;
let config_payment_cred = match config_address.payment_cred() {
Expand All @@ -112,6 +123,10 @@ async fn handle_projected_nft(
Some(pk) => pk,
};

let used_projected_nfts = get_projected_nft_inputs(db_tx, &block).await?;

let mut queued_projected_nft_records = vec![];

for (tx_body, cardano_transaction) in block.1.txs().iter().zip(multiera_txs) {
let mut outputs_map = HashMap::new();
for output_model in multiera_outputs
Expand All @@ -121,40 +136,9 @@ async fn handle_projected_nft(
outputs_map.insert(output_model.output_index, output_model.clone());
}

let mut potential_projected_nft_inputs = vec![];
for input in tx_body.inputs() {
let output_for_input = multiera_used_inputs_to_outputs_map
.get(&input.hash().to_vec())
.ok_or(DbErr::Custom(format!(
"can't find input: {}@{}",
input.hash().clone(),
input.index()
)))?
.get(&(input.index() as i64))
.ok_or(DbErr::Custom(format!(
"can't find input: {}@{}",
input.hash().clone(),
input.index()
)))?
.clone();
potential_projected_nft_inputs.push(output_for_input.model.id);
}

let seen_projected_nfts = ProjectedNft::find()
.filter(
potential_projected_nft_inputs
.into_iter()
.fold(Condition::any(), |cond, addition| {
cond.add(ProjectedNftColumn::UtxoId.eq(addition))
}),
)
.all(db_tx)
.await?;

let mut queued_projected_nft_records = vec![];

for projected_nft in seen_projected_nfts.iter().filter(|projected_nft| {
for projected_nft in used_projected_nfts.iter().filter(|projected_nft| {
projected_nft.operation == i32::from(ProjectedNftOperation::Unlocking)
&& projected_nft.tx_id == cardano_transaction.id
}) {
queued_projected_nft_records.push(entity::projected_nft::ActiveModel {
utxo_id: Set(None),
Expand Down Expand Up @@ -210,17 +194,59 @@ async fn handle_projected_nft(
});
}
}
}

if !queued_projected_nft_records.is_empty() {
ProjectedNft::insert_many(queued_projected_nft_records)
.exec(db_tx)
.await?;
}
if !queued_projected_nft_records.is_empty() {
ProjectedNft::insert_many(queued_projected_nft_records)
.exec(db_tx)
.await?;
}

Ok(())
}

async fn get_projected_nft_inputs(
db_tx: &DatabaseTransaction,
block: &BlockInfo<'_, MultiEraBlock<'_>, BlockGlobalInfo>,
) -> Result<Vec<ProjectedNftInputsQueryOutputResult>, DbErr> {
let inputs_condition = block
.1
.txs()
.iter()
.flat_map(|tx| {
tx.inputs()
.iter()
.map(|input| (input.hash().to_vec(), input.index()))
.collect::<Vec<(Vec<u8>, u64)>>()
})
.fold(Condition::any(), |cond, new_input| {
cond.add(
Condition::all()
.add(TransactionOutputColumn::OutputIndex.eq(new_input.1))
.add(TransactionColumn::Hash.eq(new_input.0)),
)
});

ProjectedNft::find()
.select_only()
.column(TransactionOutputColumn::Id)
.column(TransactionOutputColumn::TxId)
.column(TransactionOutputColumn::OutputIndex)
.column(ProjectedNftColumn::Operation)
.column(ProjectedNftColumn::Asset)
.column(ProjectedNftColumn::Amount)
.column_as(TransactionColumn::Hash, "tx_hash")
.join(
JoinType::InnerJoin,
ProjectedNftRelation::TransactionOutput.def(),
)
.join(JoinType::InnerJoin, ProjectedNftRelation::Transaction.def())
.filter(inputs_condition)
.into_model::<ProjectedNftInputsQueryOutputResult>()
.all(db_tx)
.await
}

fn extract_operation_and_datum(output: &MultiEraOutput) -> (ProjectedNftOperation, Vec<u8>) {
let datum_option = match output.datum() {
Some(datum) => DatumOption::from(datum.clone()),
Expand Down

0 comments on commit e2a8e10

Please sign in to comment.