From 9ceff438220c15500494bf952d125638d927b6ae Mon Sep 17 00:00:00 2001 From: Pavel Strakhov Date: Wed, 28 Aug 2024 10:05:02 +0100 Subject: [PATCH] feat: integrate pyth-price-publisher --- Cargo.lock | 11 + runtime/Cargo.toml | 1 + runtime/src/bank/pyth/accumulator.rs | 11 +- runtime/src/bank/pyth/batch_publish.rs | 192 ++---------------- .../bank/pyth/tests/batch_publish_tests.rs | 10 +- validator/src/main.rs | 15 +- 6 files changed, 53 insertions(+), 187 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0a652e01bd..4a3965d287 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3615,6 +3615,16 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pyth-price-publisher" +version = "0.1.0" +source = "git+https://github.com/pyth-network/pyth-crosschain?branch=add-publisher-program#1fa5089f0f5abf1fe1d48ba0dda489843fcd545a" +dependencies = [ + "bytemuck", + "solana-program 1.14.17", + "thiserror", +] + [[package]] name = "pythnet-sdk" version = "1.13.6" @@ -6200,6 +6210,7 @@ dependencies = [ "once_cell", "ouroboros", "pyth-oracle", + "pyth-price-publisher", "pythnet-sdk 1.13.6", "rand 0.7.3", "rand_chacha 0.2.2", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index ac5b87ca27..e7628442e1 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -37,6 +37,7 @@ once_cell = "1.12.0" ouroboros = "0.15.0" pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] } pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "33f901aa45f4f0005aa5a84a1479b78ca9033074" } +pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" } rand = "0.7.0" rayon = "1.5.3" regex = "1.5.6" diff --git a/runtime/src/bank/pyth/accumulator.rs b/runtime/src/bank/pyth/accumulator.rs index 2c0ae04cc8..3577b2ff59 100644 --- a/runtime/src/bank/pyth/accumulator.rs +++ b/runtime/src/bank/pyth/accumulator.rs @@ -5,6 +5,7 @@ use { bank::Bank, }, byteorder::{LittleEndian, ReadBytesExt}, + itertools::Itertools, log::*, pyth_oracle::validator::AggregationError, pythnet_sdk::{ @@ -441,7 +442,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV v2_messages.push(publisher_stake_caps_message); } - let new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { + let mut new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { warn!("extract_batch_publish_prices failed: {}", err); HashMap::new() }); @@ -458,7 +459,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV }; let mut need_save = - batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); + batch_publish::apply_published_prices(price_account, &mut new_prices, bank.slot()); // Perform Accumulation match pyth_oracle::validator::aggregate_price( @@ -483,6 +484,12 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV bank.store_account_and_update_capitalization(&pubkey, &account); } } + if !new_prices.is_empty() { + warn!( + "pyth batch publish: missing price feed accounts for indexes: {}", + new_prices.keys().join(", ") + ); + } measure.stop(); debug!( diff --git a/runtime/src/bank/pyth/batch_publish.rs b/runtime/src/bank/pyth/batch_publish.rs index e8302f8cc5..a7a7962dc6 100644 --- a/runtime/src/bank/pyth/batch_publish.rs +++ b/runtime/src/bank/pyth/batch_publish.rs @@ -4,184 +4,17 @@ use { accounts_index::{IndexKey, ScanConfig, ScanError}, bank::Bank, }, - log::warn, + log::{info, warn}, pyth_oracle::{ find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey, OracleError, PriceAccount, }, + pyth_price_publisher::accounts::publisher_prices as publisher_prices_account, solana_sdk::{account::ReadableAccount, clock::Slot}, std::collections::HashMap, thiserror::Error, }; -// TODO: move to the publish program -#[allow(dead_code)] -pub mod publisher_prices_account { - use { - bytemuck::{cast_slice, from_bytes, from_bytes_mut, Pod, Zeroable}, - solana_sdk::clock::Slot, - std::mem::size_of, - thiserror::Error, - }; - - const FORMAT: u32 = 2848712303; - - #[derive(Debug, Clone, Copy, Zeroable, Pod)] - #[repr(C, packed)] - pub struct PublisherPricesHeader { - pub format: u32, - pub publisher: [u8; 32], - pub slot: Slot, - pub num_prices: u32, - } - - impl PublisherPricesHeader { - fn new(publisher: [u8; 32]) -> Self { - PublisherPricesHeader { - format: FORMAT, - publisher, - slot: 0, - num_prices: 0, - } - } - } - - #[derive(Debug, Clone, Copy, Zeroable, Pod)] - #[repr(C, packed)] - pub struct PublisherPrice { - // 4 high bits: trading status - // 28 low bits: feed index - pub trading_status_and_feed_index: u32, - pub price: i64, - pub confidence: u64, - } - - #[derive(Debug, Error)] - #[error("publisher price data overflow")] - pub struct PublisherPriceError; - - impl PublisherPrice { - pub fn new( - feed_index: u32, - trading_status: u32, - price: i64, - confidence: u64, - ) -> Result { - if feed_index >= (1 << 28) || trading_status >= (1 << 4) { - return Err(PublisherPriceError); - } - Ok(Self { - trading_status_and_feed_index: (trading_status << 28) | feed_index, - price, - confidence, - }) - } - - pub fn trading_status(&self) -> u32 { - self.trading_status_and_feed_index >> 28 - } - - pub fn feed_index(&self) -> u32 { - self.trading_status_and_feed_index & ((1 << 28) - 1) - } - } - - #[derive(Debug, Error)] - pub enum ReadAccountError { - #[error("data too short")] - DataTooShort, - #[error("format mismatch")] - FormatMismatch, - #[error("invalid num prices")] - InvalidNumPrices, - } - - #[derive(Debug, Error)] - pub enum ExtendError { - #[error("not enough space")] - NotEnoughSpace, - #[error("invalid length")] - InvalidLength, - } - - pub fn read( - data: &[u8], - ) -> Result<(&PublisherPricesHeader, &[PublisherPrice]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let header: &PublisherPricesHeader = - from_bytes(&data[..size_of::()]); - if header.format != FORMAT { - return Err(ReadAccountError::FormatMismatch); - } - let prices_bytes = &data[size_of::()..]; - let num_prices: usize = header.num_prices.try_into().unwrap(); - let expected_len = num_prices.saturating_mul(size_of::()); - if expected_len > prices_bytes.len() { - return Err(ReadAccountError::InvalidNumPrices); - } - let prices = cast_slice(&prices_bytes[..expected_len]); - Ok((header, prices)) - } - - pub fn size(max_prices: usize) -> usize { - size_of::() + max_prices * size_of::() - } - - pub fn read_mut( - data: &mut [u8], - ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let (header, prices) = data.split_at_mut(size_of::()); - let header: &mut PublisherPricesHeader = from_bytes_mut(header); - if header.format != FORMAT { - return Err(ReadAccountError::FormatMismatch); - } - Ok((header, prices)) - } - - pub fn create( - data: &mut [u8], - publisher: [u8; 32], - ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { - if data.len() < size_of::() { - return Err(ReadAccountError::DataTooShort); - } - let (header, prices) = data.split_at_mut(size_of::()); - let header: &mut PublisherPricesHeader = from_bytes_mut(header); - *header = PublisherPricesHeader::new(publisher); - Ok((header, prices)) - } - - pub fn extend( - header: &mut PublisherPricesHeader, - prices: &mut [u8], - new_prices: &[u8], - ) -> Result<(), ExtendError> { - if new_prices.len() % size_of::() != 0 { - return Err(ExtendError::InvalidLength); - } - let num_new_prices = (new_prices.len() / size_of::()) - .try_into() - .expect("unexpected overflow"); - let num_prices: usize = header.num_prices.try_into().unwrap(); - let start = size_of::() * num_prices; - let end = size_of::() * num_prices + new_prices.len(); - header.num_prices = header - .num_prices - .checked_add(num_new_prices) - .expect("unexpected overflow"); - prices - .get_mut(start..end) - .ok_or(ExtendError::NotEnoughSpace)? - .copy_from_slice(new_prices); - Ok(()) - } -} - #[derive(Debug, Error)] pub enum HandleBatchPublishError { #[error("failed to get program accounts: {0}")] @@ -214,7 +47,12 @@ pub fn extract_batch_publish_prices( .map_err(HandleBatchPublishError::GetProgramAccounts)?; let mut all_prices = HashMap::>::new(); + let mut num_found_accounts = 0; + let mut num_found_prices = 0; for (account_key, account) in publisher_prices_accounts { + if !publisher_prices_account::format_matches(account.data()) { + continue; + } let (header, prices) = match publisher_prices_account::read(account.data()) { Ok(r) => r, Err(err) => { @@ -222,6 +60,7 @@ pub fn extract_batch_publish_prices( continue; } }; + num_found_accounts += 1; if header.slot != bank.slot() { // Updates from earlier slots have already been applied. continue; @@ -238,14 +77,21 @@ pub fn extract_batch_publish_prices( .entry(price.feed_index()) .or_default() .push(value); + num_found_prices += 1; } } + info!( + "pyth batch publish: found {} prices in {} accounts at slot {}", + num_found_prices, + num_found_accounts, + bank.slot() + ); Ok(all_prices) } pub fn apply_published_prices( price_data: &mut PriceAccount, - new_prices: &HashMap>, + new_prices: &mut HashMap>, slot: Slot, ) -> bool { if price_data.feed_index == 0 { @@ -253,10 +99,10 @@ pub fn apply_published_prices( } let mut any_update = false; for new_price in new_prices - .get(&price_data.feed_index) - .unwrap_or(&Vec::new()) + .remove(&price_data.feed_index) + .unwrap_or_default() { - match apply_published_price(price_data, new_price, slot) { + match apply_published_price(price_data, &new_price, slot) { Ok(()) => { any_update = true; } diff --git a/runtime/src/bank/pyth/tests/batch_publish_tests.rs b/runtime/src/bank/pyth/tests/batch_publish_tests.rs index 65a1a590dd..59bb3e0856 100644 --- a/runtime/src/bank/pyth/tests/batch_publish_tests.rs +++ b/runtime/src/bank/pyth/tests/batch_publish_tests.rs @@ -2,7 +2,6 @@ use { crate::{ bank::pyth::{ accumulator::{BATCH_PUBLISH_PID, ORACLE_PID}, - batch_publish::publisher_prices_account::{self, PublisherPrice}, tests::{create_new_bank_for_tests_with_index, new_from_parent}, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, @@ -11,6 +10,9 @@ use { pyth_oracle::{ solana_program::account_info::AccountInfo, PriceAccount, PriceAccountFlags, PythAccount, }, + pyth_price_publisher::accounts::publisher_prices::{ + self as publisher_prices_account, PublisherPrice, + }, solana_sdk::{ account::{AccountSharedData, ReadableAccount, WritableAccount}, clock::Epoch, @@ -40,11 +42,7 @@ fn test_batch_publish() { let publisher1_key = keypair_from_seed(seed).unwrap(); let (publisher1_prices_key, _bump) = Pubkey::find_program_address( - // TODO: real seed - &[ - b"PUBLISHER_PRICES_ACCOUNT", - &publisher1_key.pubkey().to_bytes(), - ], + &[b"BUFFER", &publisher1_key.pubkey().to_bytes()], &BATCH_PUBLISH_PID, ); let mut publisher1_prices_account = diff --git a/validator/src/main.rs b/validator/src/main.rs index a86c88b488..4103606a42 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -3214,12 +3214,15 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { ); } - if exclude_keys - && (account_indexes_exclude_keys.contains(&*ORACLE_PID) - || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) - || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID)) - { - panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); + if exclude_keys { + for key in &[&*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID] { + if account_indexes_exclude_keys.contains(key) { + panic!( + "This key must *not* be excluded from the account index: {}", + key + ); + } + } } let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) {