Skip to content

Commit

Permalink
update cardano source (less settings required) and add shelley genesi…
Browse files Browse the repository at this point in the history
…s processing task
  • Loading branch information
ecioppettini committed Sep 24, 2024
1 parent 2fa8e10 commit 165e11b
Show file tree
Hide file tree
Showing 23 changed files with 676 additions and 428 deletions.
245 changes: 28 additions & 217 deletions Cargo.lock

Large diffs are not rendered by default.

13 changes: 9 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@ members = [


[workspace.dependencies]
cml-chain = { version = "6.0.0" }
cml-core = { version = "6.0.0" }
cml-crypto = { version = "6.0.0" }
cml-multi-era = { version = "6.0.0" }
# cml-chain = { version = "6.0.0" }
# cml-core = { version = "6.0.0" }
# cml-crypto = { version = "6.0.0" }
# cml-multi-era = { version = "6.0.0" }

cml-chain = { git = "https://github.com/dcSpark/cardano-multiplatform-lib", rev = "b7acbd3634f5ba8402a9704f0413bd434ed157c3" }
cml-core = { git = "https://github.com/dcSpark/cardano-multiplatform-lib", rev = "b7acbd3634f5ba8402a9704f0413bd434ed157c3" }
cml-crypto = { git = "https://github.com/dcSpark/cardano-multiplatform-lib", rev = "b7acbd3634f5ba8402a9704f0413bd434ed157c3" }
cml-multi-era = { git = "https://github.com/dcSpark/cardano-multiplatform-lib", rev = "b7acbd3634f5ba8402a9704f0413bd434ed157c3" }
6 changes: 3 additions & 3 deletions indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ strip = true

[dependencies]
# [core]
dcspark-core = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "176ea4830d7c3d00eca1c0a4246e9b364b889851" }
dcspark-blockchain-source = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "176ea4830d7c3d00eca1c0a4246e9b364b889851" }
multiverse = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "176ea4830d7c3d00eca1c0a4246e9b364b889851" }
dcspark-core = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" }
dcspark-blockchain-source = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" }
multiverse = { git = "https://github.com/dcSpark/dcspark-core.git", rev = "ebb245ac047f9d45dba97f07a5bb525ffd81a539" }

# [local]
entity = { path = "entity" }
Expand Down
15 changes: 0 additions & 15 deletions indexer/configs/custom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,6 @@ sink:
relay:
- "localhost"
- 3001
from:
BlockHeader:
slot_nb: 1
hash: "ba8066f73eb9cf4ad9adf38051c54d3a51d92cb98561cffc1f202b1b97739cd5"
genesis_parent: "0ded594a3411f6d3236228abc1e2ef8c2a21e09d859ea23bfc2182f92853cba8"
genesis:
BlockHeader:
slot_nb: 0
hash: "7a32184d9e0068b0fa75fd0ecaad798f9bc573d4921c519b12968e26ff0747a3"
shelley_era_config:
first_slot: 0
start_epoch: 0
known_time: 1722355346
slot_length: 1
epoch_length_seconds: 500

start_block:

14 changes: 14 additions & 0 deletions indexer/entity/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,17 @@ impl TryFrom<i32> for EraValue {
}
}
}

impl EraValue {
pub fn to_str(&self) -> &'static str {
match self {
EraValue::Byron => "byron",
EraValue::Shelley => "shelley",
EraValue::Allegra => "allegra",
EraValue::Mary => "mary",
EraValue::Alonzo => "alonzo",
EraValue::Babbage => "babbage",
EraValue::Conway => "conway",
}
}
}
32 changes: 32 additions & 0 deletions indexer/entity/src/genesis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use sea_orm::entity::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Deserialize, Serialize)]
#[sea_orm(table_name = "Era")]
pub struct Model {
#[sea_orm(primary_key)]
pub era: i32,
pub block_id: i32,
pub block_height: i32,
pub first_slot: i64,
pub start_epoch: i64,
pub epoch_length_seconds: i64,
}

#[derive(Copy, Clone, Debug, DeriveRelation, EnumIter)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::block::Entity",
from = "Column::BlockId",
to = "super::block::Column::Id"
)]
Block,
}

impl Related<super::block::Entity> for Entity {
fn to() -> RelationDef {
Relation::Block.def()
}
}

impl ActiveModelBehavior for ActiveModel {}
2 changes: 1 addition & 1 deletion indexer/entity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ pub mod asset_mint;
pub mod asset_utxos;
pub mod cip25_entry;
pub mod dex_swap;
pub mod genesis;
pub mod governance_votes;
pub mod native_asset;
pub mod plutus_data;
pub mod plutus_data_hash;
pub mod projected_nft;
// todo: rename to pool?
pub mod stake_delegation;
pub mod stake_delegation_drep;
pub mod transaction_metadata;
4 changes: 4 additions & 0 deletions indexer/entity/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub use super::dex_swap::{
ActiveModel as DexSwapActiveModel, Column as DexSwapColumn, Entity as DexSwap,
Model as DexSwapModel, PrimaryKey as DexSwapPrimaryKey, Relation as DexSwapRelation,
};
pub use super::genesis::{
ActiveModel as GenesisActiveModel, Column as GenesisColumn, Entity as Genesis,
Model as GenesisModel, PrimaryKey as GenesisPrimaryKey, Relation as GenesisRelation,
};
pub use super::governance_votes::{
ActiveModel as GovernanceVoteActiveModel, Column as GovernanceVoteColumn,
Entity as GovernanceVote, Model as GovernanceVoteModel, PrimaryKey as GovernanceVotePrimaryKey,
Expand Down
2 changes: 2 additions & 0 deletions indexer/execution_plans/default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ include_payload=false
[GenesisTransactionTask]
include_payload=true

[ShelleyGenesisBlockTask]

[ByronBlockTask]
readonly=false
include_payload=false
Expand Down
2 changes: 2 additions & 0 deletions indexer/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod m20231220_000018_asset_utxo_table;
mod m20240229_000019_add_block_tx_count_column;
mod m20240326_000020_create_drep_delegation_table;
mod m20240326_000021_create_governance_voting_table;
mod m20240920_000022_create_genesis_tracking_table;

pub struct Migrator;

Expand Down Expand Up @@ -53,6 +54,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240229_000019_add_block_tx_count_column::Migration),
Box::new(m20240326_000020_create_drep_delegation_table::Migration),
Box::new(m20240326_000021_create_governance_voting_table::Migration),
Box::new(m20240920_000022_create_genesis_tracking_table::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use sea_schema::migration::prelude::*;

use entity::genesis::*;
use entity::prelude::{Block, BlockColumn};

pub struct Migration;

impl MigrationName for Migration {
fn name(&self) -> &str {
"m20240920_000022_create_genesis_tracking_table"
}
}

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Entity)
.if_not_exists()
.col(
ColumnDef::new(Column::Era)
.integer()
.not_null()
.primary_key(),
)
.col(ColumnDef::new(Column::BlockId).integer().not_null())
.col(ColumnDef::new(Column::BlockHeight).integer().not_null())
.col(ColumnDef::new(Column::FirstSlot).big_integer().not_null())
.col(ColumnDef::new(Column::StartEpoch).big_integer().not_null())
.col(
ColumnDef::new(Column::EpochLengthSeconds)
.big_integer()
.not_null(),
)
.foreign_key(
ForeignKey::create()
.name("fk-transaction-block_id")
.from(Entity, Column::BlockId)
.to(Block, BlockColumn::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Entity).to_owned())
.await
}
}
8 changes: 1 addition & 7 deletions indexer/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crate::sink::Sink;
use crate::types::StoppableService;
use async_trait::async_trait;
use dcspark_blockchain_source::{GetNextFrom, PullFrom, Source};
use entity::block::EraValue;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
Expand Down Expand Up @@ -45,9 +44,6 @@ impl<

let mut perf_aggregator = PerfAggregator::new();

// TODO: fetch from DB
let mut latest_era: Option<EraValue> = None;

while self.running.load(SeqCst) {
let event_fetch_start = std::time::Instant::now();
let event = self.source.pull(&pull_from).await?;
Expand All @@ -59,9 +55,7 @@ impl<
};
perf_aggregator.block_fetch += event_fetch_start.elapsed();
let new_from = event.next_from().unwrap_or(pull_from);
self.sink
.process(event, &mut perf_aggregator, &mut latest_era)
.await?;
self.sink.process(event, &mut perf_aggregator).await?;
pull_from = new_from;
}

Expand Down
77 changes: 2 additions & 75 deletions indexer/src/genesis.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, Context as _};
use entity::block::EraValue;
use std::fs;
use std::sync::{Arc, Mutex};
Expand All @@ -23,7 +23,7 @@ pub async fn process_byron_genesis(
tracing::info!("Parsing genesis file...");
let mut time_counter = std::time::Instant::now();

let file = fs::File::open(genesis_file).expect("Failed to open genesis file");
let file = fs::File::open(genesis_file).context("Failed to open genesis file")?;
let genesis_file: Box<GenesisData> = Box::new(
parse_genesis_data(file).map_err(|err| anyhow!("can't parse genesis data: {:?}", err))?,
);
Expand Down Expand Up @@ -85,76 +85,3 @@ pub async fn insert_byron_genesis(

Ok(())
}

pub async fn process_shelley_genesis(
conn: &DatabaseConnection,
genesis_file: &str,
exec_plan: Arc<ExecutionPlan>,
) -> anyhow::Result<()> {
let task_perf_aggregator = Arc::new(Mutex::new(TaskPerfAggregator::default()));

tracing::info!("Parsing genesis file...");
let mut time_counter = std::time::Instant::now();

let file = fs::File::open(genesis_file).expect("Failed to open genesis file");
let genesis_file: Box<GenesisData> = Box::new(
parse_genesis_data(file).map_err(|err| anyhow!("can't parse genesis data: {:?}", err))?,
);

tracing::info!(
"Finished parsing genesis file after {:?}",
time_counter.elapsed()
);
time_counter = std::time::Instant::now();

tracing::info!("Inserting Shelley genesis data into database...");
conn.transaction(|txn| {
Box::pin(insert_shelley_genesis(
txn,
genesis_file,
exec_plan.clone(),
task_perf_aggregator.clone(),
))
})
.await?;

tracing::info!(
"Finished inserting genesis data after {:?}",
time_counter.elapsed()
);
tracing::trace!(
"Genesis task-wise time spent:\n{:#?}",
task_perf_aggregator.lock().unwrap()
);

Ok(())
}

pub async fn insert_shelley_genesis(
txn: &DatabaseTransaction,
genesis_file: Box<GenesisData>,
exec_plan: Arc<ExecutionPlan>,
task_perf_aggregator: Arc<Mutex<TaskPerfAggregator>>,
) -> Result<(), DbErr> {
let genesis_hash = genesis_file.genesis_prev.to_raw_bytes();
tracing::info!(
"Starting sync based on genesis hash {}",
hex::encode(genesis_hash)
);

let block_global_info = BlockGlobalInfo {
era: EraValue::Shelley,
epoch: None,
epoch_slot: None,
};

process_genesis_block(
txn,
("", &genesis_file, &block_global_info),
&exec_plan,
task_perf_aggregator.clone(),
)
.await?;

Ok(())
}
12 changes: 6 additions & 6 deletions indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use dcspark_blockchain_source::{GetNextFrom, Source};
use migration::async_std::path::PathBuf;
use oura::sources::BearerKind;
use serde::Deserialize;
use std::borrow::Cow;
use std::fs::File;
use std::process::exit;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -65,14 +64,12 @@ pub enum SinkConfig {
},
}

pub enum Network {}

#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
#[serde(deny_unknown_fields)]
pub enum SourceConfig {
Oura { socket: String, bearer: BearerKind },
CardanoNet { relay: (Cow<'static, str>, u16) },
CardanoNet { relay: (String, u16) },
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -239,8 +236,11 @@ async fn main() -> anyhow::Result<()> {
.ok_or_else(|| anyhow!("Starting points list is empty"))?;

let network_config = dcspark_blockchain_source::cardano::NetworkConfiguration {
relay: relay.clone(),
from: start_from.clone(),
relay: dcspark_blockchain_source::cardano::Relay::UrlPort(
relay.clone().0,
relay.clone().1,
),
from: None,
..base_config
};

Expand Down
2 changes: 0 additions & 2 deletions indexer/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::perf_aggregator::PerfAggregator;
use async_trait::async_trait;
use dcspark_blockchain_source::{EventObject, PullFrom};
use entity::block::EraValue;

#[async_trait]
pub trait Sink {
Expand All @@ -13,6 +12,5 @@ pub trait Sink {
&mut self,
event: Self::Event,
perf_aggregator: &mut PerfAggregator,
latest_era: &mut Option<EraValue>,
) -> anyhow::Result<()>;
}
Loading

0 comments on commit 165e11b

Please sign in to comment.