diff --git a/Cargo.lock b/Cargo.lock index 888c6b7c5f..9a006ae5ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3556,7 +3556,7 @@ dependencies = [ [[package]] name = "pyth-oracle" version = "2.32.1" -source = "git+https://github.com/pyth-network/pyth-client?tag=oracle-v2.32.1#94babc4096eb7ff804f5087fb9600ee110b9591d" +source = "git+https://github.com/pyth-network/pyth-client?branch=batch-publish#78f31c07647da5281ed8dbe32c4645069894fd83" dependencies = [ "bindgen 0.60.1", "bitflags 2.6.0", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 23e02ae54f..319d31b0b8 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -35,7 +35,7 @@ num-traits = { version = "0.2" } num_cpus = "1.13.1" once_cell = "1.12.0" ouroboros = "0.15.0" -pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", tag = "oracle-v2.32.1", features = ["library"] } +pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] } pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "e670f57f89b05398ca352e4accb1e32724a8e1b4" } rand = "0.7.0" rayon = "1.5.3" diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 760901289b..e7a3bb53d8 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -199,6 +199,7 @@ mod sysvar_cache; mod transaction_account_state_info; pub mod pyth_accumulator; +mod pyth_batch_publish; #[cfg(test)] mod pyth_accumulator_tests; diff --git a/runtime/src/bank/pyth_accumulator.rs b/runtime/src/bank/pyth_accumulator.rs index eb34783073..e5a9d5a783 100644 --- a/runtime/src/bank/pyth_accumulator.rs +++ b/runtime/src/bank/pyth_accumulator.rs @@ -1,5 +1,5 @@ use { - super::Bank, + super::{pyth_batch_publish, Bank}, crate::accounts_index::{IndexKey, ScanConfig, ScanError}, byteorder::{LittleEndian, ReadBytesExt}, log::*, @@ -16,7 +16,10 @@ use { hash::hashv, pubkey::Pubkey, }, - std::env::{self, VarError}, + std::{ + collections::HashMap, + env::{self, VarError}, + }, }; pub const ACCUMULATOR_RING_SIZE: u32 = 10_000; @@ -44,6 +47,13 @@ lazy_static! { .parse() .unwrap(), ); + pub static ref BATCH_PUBLISH_PID: Pubkey = env_pubkey_or( + "BATCH_PUBLISH_PID", + // TODO: replace with real program id + "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epA" + .parse() + .unwrap(), + ); } /// Accumulator specific error type. It would be nice to use `transaction::Error` but it does @@ -121,6 +131,7 @@ pub fn get_accumulator_keys() -> Vec<( ("ACCUMULATOR_SEQUENCE_ADDR", Ok(*ACCUMULATOR_SEQUENCE_ADDR)), ("WORMHOLE_PID", Ok(*WORMHOLE_PID)), ("ORACLE_PID", Ok(*ORACLE_PID)), + ("BATCH_PUBLISH_PID", Ok(*BATCH_PUBLISH_PID)), ] } @@ -413,19 +424,33 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV let mut any_v1_aggregations = false; let mut v2_messages = Vec::new(); + let new_prices = pyth_batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| { + warn!("extract_batch_publish_prices failed: {}", err); + HashMap::new() + }); + for (pubkey, mut account) in accounts { let mut price_account_data = account.data().to_owned(); + let price_account = if let Ok(data) = + pyth_oracle::validator::validate_price_account(&mut price_account_data) + { + data + } else { + continue; // Not a price account. + }; + + let mut need_save = + pyth_batch_publish::apply_published_prices(price_account, &new_prices, bank.slot()); // Perform Accumulation match pyth_oracle::validator::aggregate_price( bank.slot(), bank.clock().unix_timestamp, &pubkey.to_bytes().into(), - &mut price_account_data, + price_account, ) { Ok(messages) => { - account.set_data(price_account_data); - bank.store_account_and_update_capitalization(&pubkey, &account); + need_save = true; v2_messages.extend(messages); } Err(err) => match err { @@ -435,6 +460,10 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV } }, } + if need_save { + account.set_data(price_account_data); + bank.store_account_and_update_capitalization(&pubkey, &account); + } } measure.stop(); diff --git a/runtime/src/bank/pyth_accumulator_tests.rs b/runtime/src/bank/pyth_accumulator_tests.rs index e8736af9ec..d9cdddf217 100644 --- a/runtime/src/bank/pyth_accumulator_tests.rs +++ b/runtime/src/bank/pyth_accumulator_tests.rs @@ -6,11 +6,15 @@ use { AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, }, bank::{ - pyth_accumulator::{get_accumulator_keys, ACCUMULATOR_RING_SIZE, ORACLE_PID}, + pyth_accumulator::{ + get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID, + }, + pyth_batch_publish::publisher_prices_account::{self, PublisherPrice}, Bank, }, genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo}, }, + bytemuck::{cast_slice, checked::from_bytes}, byteorder::{ByteOrder, LittleEndian, ReadBytesExt}, itertools::Itertools, pyth_oracle::{ @@ -45,7 +49,9 @@ fn create_new_bank_for_tests_with_index(genesis_config: &GenesisConfig) -> Bank AccountSecondaryIndexes { keys: Some(AccountSecondaryIndexesIncludeExclude { exclude: false, - keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID].into_iter().collect(), + keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID, *BATCH_PUBLISH_PID] + .into_iter() + .collect(), }), indexes: [AccountIndex::ProgramId].into_iter().collect(), }, @@ -899,3 +905,128 @@ fn test_get_accumulator_keys() { ]; assert_eq!(accumulator_keys, expected_pyth_keys); } + +#[test] +fn test_batch_publish() { + let leader_pubkey = solana_sdk::pubkey::new_rand(); + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config_with_leader(5, &leader_pubkey, 3); + + // Set epoch length to 32 so we can advance epochs quickly. We also skip past slot 0 here + // due to slot 0 having special handling. + let slots_in_epoch = 32; + genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch); + let mut bank = create_new_bank_for_tests_with_index(&genesis_config); + + let generate_publisher = |seed, new_prices| { + let publisher1_key = keypair_from_seed(seed).unwrap(); + + let (publisher1_prices_key, _bump) = Pubkey::find_program_address( + // TODO: real seed + &[ + b"PUBLISHER_PRICES_ACCOUNT", + &publisher1_key.pubkey().to_bytes(), + ], + &BATCH_PUBLISH_PID, + ); + let mut publisher1_prices_account = + AccountSharedData::new(42, publisher_prices_account::size(100), &BATCH_PUBLISH_PID); + { + let (header, prices) = publisher_prices_account::create( + publisher1_prices_account.data_mut(), + publisher1_key.pubkey().to_bytes(), + ) + .unwrap(); + publisher_prices_account::extend(header, prices, cast_slice(new_prices)).unwrap(); + } + bank.store_account(&publisher1_prices_key, &publisher1_prices_account); + + publisher1_key + }; + + let publishers = [ + generate_publisher( + &[1u8; 32], + &[ + PublisherPrice::new(1, 1, 10, 2).unwrap(), + PublisherPrice::new(2, 1, 20, 3).unwrap(), + ], + ), + generate_publisher( + &[2u8; 32], + &[ + PublisherPrice::new(1, 1, 15, 2).unwrap(), + PublisherPrice::new(2, 1, 25, 3).unwrap(), + ], + ), + ]; + + let generate_price = |seeds, index| { + let (price_feed_key, _bump) = Pubkey::find_program_address(&[seeds], &ORACLE_PID); + let mut price_feed_account = + AccountSharedData::new(42, size_of::(), &ORACLE_PID); + + let messages = { + let price_feed_info_key = &price_feed_key.to_bytes().into(); + let price_feed_info_lamports = &mut 0; + let price_feed_info_owner = &ORACLE_PID.to_bytes().into(); + let price_feed_info_data = price_feed_account.data_mut(); + let price_feed_info = AccountInfo::new( + price_feed_info_key, + false, + true, + price_feed_info_lamports, + price_feed_info_data, + price_feed_info_owner, + false, + Epoch::default(), + ); + + let mut price_account = PriceAccount::initialize(&price_feed_info, 0).unwrap(); + price_account.flags.insert( + PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED, + ); + price_account.unused_3_ = index; + price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into(); + price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into(); + price_account.num_ = 2; + }; + + bank.store_account(&price_feed_key, &price_feed_account); + (price_feed_key, messages) + }; + + assert!(bank + .feature_set + .is_active(&feature_set::enable_accumulator_sysvar::id())); + assert!(bank + .feature_set + .is_active(&feature_set::move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::undo_move_accumulator_to_end_of_block::id())); + assert!(bank + .feature_set + .is_active(&feature_set::redo_move_accumulator_to_end_of_block::id())); + + let prices_with_messages = [ + generate_price(b"seeds_1", 1), + generate_price(b"seeds_2", 2), + generate_price(b"seeds_3", 3), + generate_price(b"seeds_4", 4), + ]; + + bank = new_from_parent(&Arc::new(bank)); // Advance slot 1. + bank = new_from_parent(&Arc::new(bank)); // Advance slot 2. + + let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap(); + let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data()); + assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10); + assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15); + + let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap(); + let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data()); + assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20); + assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25); +} diff --git a/runtime/src/bank/pyth_batch_publish.rs b/runtime/src/bank/pyth_batch_publish.rs new file mode 100644 index 0000000000..68a0dec0aa --- /dev/null +++ b/runtime/src/bank/pyth_batch_publish.rs @@ -0,0 +1,310 @@ +use { + super::{pyth_accumulator::BATCH_PUBLISH_PID, Bank}, + crate::accounts_index::{IndexKey, ScanConfig, ScanError}, + log::warn, + pyth_oracle::{ + find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey, + OracleError, PriceAccount, + }, + solana_sdk::{account::ReadableAccount, clock::Slot}, + std::collections::HashMap, + thiserror::Error, +}; + +#[allow(dead_code)] +pub mod publisher_prices_account { + use { + bytemuck::{cast_slice, from_bytes, from_bytes_mut, Pod, Zeroable}, + solana_sdk::clock::Slot, + std::mem::size_of, + thiserror::Error, + }; + + const FORMAT: u32 = 2848712303; + + #[derive(Debug, Clone, Copy, Zeroable, Pod)] + #[repr(C, packed)] + pub struct PublisherPricesHeader { + pub format: u32, + pub publisher: [u8; 32], + pub slot: Slot, + pub num_prices: u32, + } + + impl PublisherPricesHeader { + fn new(publisher: [u8; 32]) -> Self { + PublisherPricesHeader { + format: FORMAT, + publisher, + slot: 0, + num_prices: 0, + } + } + } + + #[derive(Debug, Clone, Copy, Zeroable, Pod)] + #[repr(C, packed)] + pub struct PublisherPrice { + // 4 high bits: trading status + // 28 low bits: feed index + pub trading_status_and_feed_index: u32, + pub price: i64, + pub confidence: u64, + } + + #[derive(Debug, Error)] + #[error("publisher price data overflow")] + pub struct PublisherPriceError; + + impl PublisherPrice { + pub fn new( + feed_index: u32, + trading_status: u32, + price: i64, + confidence: u64, + ) -> Result { + if feed_index >= (1 << 28) || trading_status >= (1 << 4) { + return Err(PublisherPriceError); + } + Ok(Self { + trading_status_and_feed_index: (trading_status << 28) | feed_index, + price, + confidence, + }) + } + + pub fn trading_status(&self) -> u32 { + self.trading_status_and_feed_index >> 28 + } + + pub fn feed_index(&self) -> u32 { + self.trading_status_and_feed_index & ((1 << 28) - 1) + } + } + + #[derive(Debug, Error)] + pub enum ReadAccountError { + #[error("data too short")] + DataTooShort, + #[error("format mismatch")] + FormatMismatch, + #[error("invalid num prices")] + InvalidNumPrices, + } + + #[derive(Debug, Error)] + pub enum ExtendError { + #[error("not enough space")] + NotEnoughSpace, + #[error("invalid length")] + InvalidLength, + } + + pub fn read( + data: &[u8], + ) -> Result<(&PublisherPricesHeader, &[PublisherPrice]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let header: &PublisherPricesHeader = + from_bytes(&data[..size_of::()]); + if header.format != FORMAT { + return Err(ReadAccountError::FormatMismatch); + } + let prices_bytes = &data[size_of::()..]; + let num_prices: usize = header.num_prices.try_into().unwrap(); + let expected_len = num_prices.saturating_mul(size_of::()); + if expected_len > prices_bytes.len() { + return Err(ReadAccountError::InvalidNumPrices); + } + let prices = cast_slice(&prices_bytes[..expected_len]); + Ok((header, prices)) + } + + pub fn size(max_prices: usize) -> usize { + size_of::() + max_prices * size_of::() + } + + pub fn read_mut( + data: &mut [u8], + ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let (header, prices) = data.split_at_mut(size_of::()); + let header: &mut PublisherPricesHeader = from_bytes_mut(header); + if header.format != FORMAT { + return Err(ReadAccountError::FormatMismatch); + } + Ok((header, prices)) + } + + pub fn create( + data: &mut [u8], + publisher: [u8; 32], + ) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> { + if data.len() < size_of::() { + return Err(ReadAccountError::DataTooShort); + } + let (header, prices) = data.split_at_mut(size_of::()); + let header: &mut PublisherPricesHeader = from_bytes_mut(header); + *header = PublisherPricesHeader::new(publisher); + Ok((header, prices)) + } + + pub fn extend( + header: &mut PublisherPricesHeader, + prices: &mut [u8], + new_prices: &[u8], + ) -> Result<(), ExtendError> { + if new_prices.len() % size_of::() != 0 { + return Err(ExtendError::InvalidLength); + } + let num_new_prices = (new_prices.len() / size_of::()) + .try_into() + .expect("unexpected overflow"); + let num_prices: usize = header.num_prices.try_into().unwrap(); + let start = size_of::() * num_prices; + let end = size_of::() * num_prices + new_prices.len(); + header.num_prices = header + .num_prices + .checked_add(num_new_prices) + .expect("unexpected overflow"); + prices + .get_mut(start..end) + .ok_or(ExtendError::NotEnoughSpace)? + .copy_from_slice(new_prices); + Ok(()) + } +} + +#[derive(Debug, Error)] +pub enum HandleBatchPublishError { + #[error("failed to get program accounts: {0}")] + GetProgramAccounts(#[from] ScanError), +} + +#[derive(Debug)] +pub struct PublisherPriceValue { + pub publisher: Pubkey, + pub trading_status: u32, + pub price: i64, + pub confidence: u64, +} + +pub fn extract_batch_publish_prices( + bank: &Bank, +) -> Result>, HandleBatchPublishError> { + assert!( + bank.account_indexes_include_key(&*BATCH_PUBLISH_PID), + "Oracle program account index missing" + ); + + let publisher_prices_accounts = bank + .get_filtered_indexed_accounts( + &IndexKey::ProgramId(*BATCH_PUBLISH_PID), + |account| account.owner() == &*BATCH_PUBLISH_PID, + &ScanConfig::new(true), + None, + ) + .map_err(HandleBatchPublishError::GetProgramAccounts)?; + + let mut all_prices = HashMap::>::new(); + for (account_key, account) in publisher_prices_accounts { + let (header, prices) = match publisher_prices_account::read(account.data()) { + Ok(r) => r, + Err(err) => { + warn!("invalid publisher prices account {}: {}", account_key, err); + continue; + } + }; + if header.slot != bank.slot() { + // Updates from earlier slots have already been applied. + continue; + } + let publisher = header.publisher.into(); + for price in prices { + all_prices + .entry(price.feed_index()) + .or_default() + .push(PublisherPriceValue { + publisher, + trading_status: price.trading_status(), + price: price.price, + confidence: price.confidence, + }); + } + } + Ok(all_prices) +} + +pub fn apply_published_prices( + price_data: &mut PriceAccount, + new_prices: &HashMap>, + slot: Slot, +) -> bool { + // TODO: store index here or somewhere else? + let price_feed_index = price_data.unused_3_ as u32; + let mut any_update = false; + for new_price in new_prices.get(&price_feed_index).unwrap_or(&Vec::new()) { + match apply_published_price(price_data, new_price, slot) { + Ok(()) => { + any_update = true; + } + Err(err) => { + warn!( + "failed to apply publisher price to price feed {}: {}", + price_data.unused_3_ as u32, err + ); + } + } + } + any_update +} + +#[derive(Debug, Error)] +enum ApplyPublishedPriceError { + #[error("publisher {1} is not allowed to publish prices for feed {0}")] + NoPermission(u32, Pubkey), + #[error("bad conf price ratio: {0}")] + BadConfPriceRatio(#[from] OracleError), + #[error("invalid publishers num_")] + InvalidPublishersNum, + #[error("invalid publisher index")] + InvalidPublisherIndex, +} + +fn apply_published_price( + price_data: &mut PriceAccount, + new_price: &PublisherPriceValue, + slot: Slot, +) -> Result<(), ApplyPublishedPriceError> { + let publishers = price_data + .comp_ + .get(..price_data.num_.try_into().unwrap()) + .ok_or(ApplyPublishedPriceError::InvalidPublishersNum)?; + + let publisher_index = find_publisher_index(publishers, &new_price.publisher).ok_or( + ApplyPublishedPriceError::NoPermission(price_data.unused_3_ as u32, new_price.publisher), + )?; + + // IMPORTANT: If the publisher does not meet the price/conf + // ratio condition, its price will not count for the next + // aggregate. + let status: u32 = get_status_for_conf_price_ratio( + new_price.price, + new_price.confidence, + new_price.trading_status, + )?; + + let publisher_price = &mut price_data + .comp_ + .get_mut(publisher_index) + .ok_or(ApplyPublishedPriceError::InvalidPublisherIndex)? + .latest_; + publisher_price.price_ = new_price.price; + publisher_price.conf_ = new_price.confidence; + publisher_price.status_ = status; + publisher_price.pub_slot_ = slot; + Ok(()) +} diff --git a/validator/src/main.rs b/validator/src/main.rs index 13d510e8d2..1afe140da6 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -52,7 +52,7 @@ use { AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude, AccountsIndexConfig, IndexLimitMb, }, - bank::pyth_accumulator::{MESSAGE_BUFFER_PID, ORACLE_PID}, + bank::pyth_accumulator::{BATCH_PUBLISH_PID, MESSAGE_BUFFER_PID, ORACLE_PID}, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, runtime_config::RuntimeConfig, snapshot_config::SnapshotConfig, @@ -3177,9 +3177,11 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { }) .collect(); - assert!(account_indexes.contains(&AccountIndex::ProgramId), + assert!( + account_indexes.contains(&AccountIndex::ProgramId), "The indexing should be enabled for program-id accounts. Add the following flag:\n\ - --account-index program-id\n"); + --account-index program-id\n" + ); let account_indexes_include_keys: HashSet = values_t!(matches, "account_index_include_key", Pubkey) @@ -3199,18 +3201,25 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes { let include_keys = !account_indexes_include_keys.is_empty(); if include_keys { - if !account_indexes_include_keys.contains(&*ORACLE_PID) || !account_indexes_include_keys.contains(&*MESSAGE_BUFFER_PID) { + if !account_indexes_include_keys.contains(&*ORACLE_PID) + || !account_indexes_include_keys.contains(&*MESSAGE_BUFFER_PID) + || !account_indexes_include_keys.contains(&*BATCH_PUBLISH_PID) + { panic!( "The oracle program id and message buffer program id must be included in the account index. Add the following flags\n\ --account-index-include-key {}\n\ + --account-index-include-key {}\n\ --account-index-include-key {}\n", - &*ORACLE_PID, &*MESSAGE_BUFFER_PID + &*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID, ); } } if exclude_keys { - if account_indexes_exclude_keys.contains(&*ORACLE_PID) || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) { + if account_indexes_exclude_keys.contains(&*ORACLE_PID) + || account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID) + || account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID) + { panic!("The oracle program id and message buffer program id must *not* be excluded from the account index."); } }