Skip to content

Commit

Permalink
Merge pull request #516 from paritytech/AndreiEres/on-demand-orders
Browse files Browse the repository at this point in the history
Track on-demand orders
  • Loading branch information
AndreiEres authored Aug 24, 2023
2 parents 6613a13 + afe5ee7 commit bfb7edf
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 11 deletions.
30 changes: 29 additions & 1 deletion essentials/src/api/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
},
polkadot_primitives::{CoreIndex, ValidatorIndex},
},
types::{Assignment, BlockNumber, ClaimQueue, CoreAssignment, CoreOccupied, ParasEntry},
types::{Assignment, BlockNumber, ClaimQueue, CoreAssignment, CoreOccupied, OnDemandOrder, ParasEntry},
};
use log::error;
use std::collections::{BTreeMap, VecDeque};
Expand Down Expand Up @@ -101,6 +101,34 @@ pub(crate) fn decode_claim_queue(raw: &Value<u32>) -> Result<ClaimQueue, SubxtWr
Ok(claim_queue)
}

pub(crate) fn decode_on_demand_order(raw: &Composite<u32>) -> Result<OnDemandOrder, SubxtWrapperError> {
match raw {
Composite::Named(v) => {
let raw_para_id = v
.iter()
.find_map(|(field, value)| if field == "para_id" { Some(value) } else { None })
.ok_or(SubxtWrapperError::DecodeDynamicError(
"named composite with field `para_id`".to_string(),
ValueDef::Composite(raw.clone()),
))?;
let raw_spot_price = v
.iter()
.find_map(|(field, value)| if field == "spot_price" { Some(value) } else { None })
.ok_or(SubxtWrapperError::DecodeDynamicError(
"named composite with field `spot_price`".to_string(),
ValueDef::Composite(raw.clone()),
))?;

Ok(OnDemandOrder {
para_id: decode_composite_u128_value(raw_para_id)? as u32,
spot_price: decode_u128_value(raw_spot_price)?,
})
},
_ =>
Err(SubxtWrapperError::DecodeDynamicError("named composite".to_string(), ValueDef::Composite(raw.clone()))),
}
}

fn decode_paras_entry_option(raw: &Value<u32>) -> Result<Option<ParasEntry>, SubxtWrapperError> {
match decode_option(raw)? {
Some(v) => Ok(Some(decode_paras_entry(v)?)),
Expand Down
2 changes: 1 addition & 1 deletion essentials/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with polkadot-introspector. If not, see <http://www.gnu.org/licenses/>.
//

mod dynamic;
pub mod dynamic;
mod storage;
pub mod subxt_wrapper;

Expand Down
22 changes: 17 additions & 5 deletions essentials/src/chain_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
// along with polkadot-introspector. If not, see <http://www.gnu.org/licenses/>.
//

use crate::metadata::{
polkadot::{
para_inclusion::events::{CandidateBacked, CandidateIncluded, CandidateTimedOut},
paras_disputes::events::{DisputeConcluded, DisputeInitiated},
use crate::{
api::dynamic::decode_on_demand_order,
metadata::{
polkadot::{
para_inclusion::events::{CandidateBacked, CandidateIncluded, CandidateTimedOut},
paras_disputes::events::{DisputeConcluded, DisputeInitiated},
},
polkadot_primitives::CandidateDescriptor,
},
polkadot_primitives::CandidateDescriptor,
types::OnDemandOrder,
};
use color_eyre::{eyre::eyre, Result};
use parity_scale_codec::{Decode, Encode};
Expand All @@ -42,6 +46,8 @@ pub enum ChainEvent<T: subxt::Config> {
DisputeConcluded(SubxtDispute, SubxtDisputeResult),
/// Backing, inclusion, time out for a parachain candidate
CandidateChanged(Box<SubxtCandidateEvent>),
/// On-demand parachain placed its order
OnDemandOrderPlaced(<PolkadotConfig as subxt::Config>::Hash, OnDemandOrder),
/// Anything undecoded
RawEvent(<PolkadotConfig as subxt::Config>::Hash, subxt::events::EventDetails<T>),
}
Expand Down Expand Up @@ -141,6 +147,12 @@ pub async fn decode_chain_event<T: subxt::Config>(
))))
}

// TODO: Use `is_specific_event` as soon as shows up in types
if event.pallet_name() == "OnDemandAssignmentProvider" && event.variant_name() == "OnDemandOrderPlaced" {
let decoded = decode_on_demand_order(&event.field_values()?)?;
return Ok(ChainEvent::OnDemandOrderPlaced(block_hash, decoded))
}

Ok(ChainEvent::RawEvent(block_hash, event))
}

Expand Down
23 changes: 22 additions & 1 deletion essentials/src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
chain_subscription::ChainSubscriptionEvent,
metadata::polkadot_primitives::DisputeStatement,
storage::{RecordTime, RecordsStorageConfig, StorageEntry},
types::{Timestamp, H256},
types::{OnDemandOrder, Timestamp, H256},
utils::RetryOptions,
};
use candidate_record::{CandidateDisputed, CandidateInclusionRecord, CandidateRecord, DisputeResult};
Expand Down Expand Up @@ -104,6 +104,8 @@ pub enum CollectorPrefixType {
InherentData,
/// Dispute information indexed by Parachain-Id; data is DisputeInfo
Dispute(u32),
/// On-demand order information by parachain id
OnDemandOrder(u32),
}

/// A type that defines prefix + hash itself
Expand Down Expand Up @@ -335,6 +337,8 @@ impl Collector {
ChainEvent::DisputeInitiated(dispute_event) => self.process_dispute_initiated(dispute_event).await,
ChainEvent::DisputeConcluded(dispute_event, dispute_outcome) =>
self.process_dispute_concluded(dispute_event, dispute_outcome).await,
ChainEvent::OnDemandOrderPlaced(block_hash, order) =>
self.process_on_demand_order_placed(block_hash, order).await,
_ => Ok(()),
}
}
Expand Down Expand Up @@ -962,6 +966,23 @@ impl Collector {
Ok(())
}

async fn process_on_demand_order_placed(
&self,
block_hash: &H256,
order: &OnDemandOrder,
) -> Result<(), CollectorError> {
self.storage_write_prefixed(
CollectorPrefixType::OnDemandOrder(order.para_id),
*block_hash,
StorageEntry::new_onchain(
RecordTime::with_ts(self.state.current_relay_chain_block_number, get_unix_time_unwrap()),
order,
),
)
.await?;
Ok(())
}

async fn storage_read_prefixed(&self, p: CollectorPrefixType, k: H256) -> Option<StorageEntry> {
self.api.storage().storage_read_prefixed(p, k).await
}
Expand Down
9 changes: 9 additions & 0 deletions essentials/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::metadata::{
},
polkadot_primitives::CoreIndex,
};
use parity_scale_codec::{Decode, Encode};
use std::collections::{BTreeMap, VecDeque};
use subxt::utils;

Expand Down Expand Up @@ -81,3 +82,11 @@ pub enum CoreOccupied {
/// A paras.
Paras,
}

// TODO: Take it from runtime types v5
/// Temporary abstraction to cover `Event::OnDemandAssignmentProvider`
#[derive(Debug, Decode, Encode)]
pub struct OnDemandOrder {
pub para_id: u32,
pub spot_price: u128,
}
23 changes: 21 additions & 2 deletions parachain-tracer/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
use super::{progress::ParachainProgressUpdate, tracker::DisputesTracker};
use clap::Parser;
use color_eyre::Result;
use polkadot_introspector_essentials::constants::STANDARD_BLOCK_TIME;
use polkadot_introspector_essentials::{constants::STANDARD_BLOCK_TIME, types::OnDemandOrder};
use prometheus_endpoint::{
prometheus::{Gauge, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts},
prometheus::{Gauge, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts},
Registry,
};
use std::{net::ToSocketAddrs, time::Duration};
Expand Down Expand Up @@ -73,6 +73,8 @@ struct MetricsInner {
para_backing_times: HistogramVec,
/// Average candidate inclusion time measured in seconds.
para_block_times_sec: HistogramVec,
/// Parachain's on-demand orders
para_on_demand_orders: GaugeVec,
/// Finality lag
finality_lag: Gauge,
}
Expand Down Expand Up @@ -195,6 +197,16 @@ impl Metrics {
}
}

pub(crate) fn on_on_demand_order(&self, order: &OnDemandOrder) {
if let Some(metrics) = &self.0 {
let para_str: String = order.para_id.to_string();
metrics
.para_on_demand_orders
.with_label_values(&[&para_str[..]])
.set(order.spot_price as f64);
}
}

pub(crate) fn on_finality_lag(&self, lag: u32) {
if let Some(metrics) = &self.0 {
metrics.finality_lag.set(lag.into());
Expand Down Expand Up @@ -320,6 +332,13 @@ fn register_metrics(registry: &Registry) -> Result<Metrics> {
)?,
registry,
)?,
para_on_demand_orders: prometheus_endpoint::register(
GaugeVec::new(
Opts::new("pc_para_on_demand_orders", "Parachain's on demand orders"),
&["parachain_id"],
)?,
registry,
)?,
finality_lag: prometheus_endpoint::register(
Gauge::new("pc_finality_lag", "Finality lag")?,
registry,
Expand Down
20 changes: 19 additions & 1 deletion parachain-tracer/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use polkadot_introspector_essentials::{
chain_events::SubxtDisputeResult,
collector::{candidate_record::CandidateRecord, CollectorPrefixType, CollectorStorageApi, DisputeInfo},
metadata::polkadot_primitives,
types::{AccountId32, BlockNumber, CoreOccupied, Timestamp, H256},
types::{AccountId32, BlockNumber, CoreOccupied, OnDemandOrder, Timestamp, H256},
};
use std::{
collections::{BTreeMap, HashMap},
Expand Down Expand Up @@ -154,6 +154,8 @@ pub struct SubxtTracker {
disputes: Vec<DisputesTracker>,
/// Current relay chain block timestamp.
current_relay_block_ts: Option<Timestamp>,
/// Current on-demand order
on_demand_order: Option<OnDemandOrder>,
/// Last observed finality lag
finality_lag: Option<u32>,
/// Last relay chain block timestamp.
Expand Down Expand Up @@ -247,6 +249,8 @@ impl ParachainBlockTracker for SubxtTracker {
error!("Failed to get inherent data for {:?}", block_hash);
}

self.set_on_demand_order(block_hash).await;

Ok(&self.current_candidate)
}

Expand Down Expand Up @@ -343,6 +347,10 @@ impl ParachainBlockTracker for SubxtTracker {
metrics.on_block(tm.as_secs_f64(), self.para_id);
}

if let Some(ref order) = self.on_demand_order {
metrics.on_on_demand_order(order);
}

if let Some(finality_lag) = self.finality_lag {
metrics.on_finality_lag(finality_lag);
}
Expand Down Expand Up @@ -374,6 +382,7 @@ impl SubxtTracker {
current_relay_block: None,
previous_relay_block: None,
current_relay_block_ts: None,
on_demand_order: None,
finality_lag: None,
disputes: Vec::new(),
last_assignment: None,
Expand All @@ -392,6 +401,15 @@ impl SubxtTracker {
self.current_relay_block = Some((block_number, block_hash));
}

async fn set_on_demand_order(&mut self, block_hash: H256) {
self.on_demand_order = self
.api
.storage()
.storage_read_prefixed(CollectorPrefixType::OnDemandOrder(self.para_id), block_hash)
.await
.map(|v| v.into_inner::<OnDemandOrder>().unwrap());
}

async fn get_session_keys(&self, session_index: u32) -> Option<Vec<AccountId32>> {
let session_hash = BlakeTwo256::hash(&session_index.to_be_bytes()[..]);
self.api
Expand Down

0 comments on commit bfb7edf

Please sign in to comment.