Skip to content

Commit

Permalink
feat: integrate pyth-price-publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
Riateche committed Aug 28, 2024
1 parent 2611090 commit 9ceff43
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 187 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ once_cell = "1.12.0"
ouroboros = "0.15.0"
pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] }
pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "33f901aa45f4f0005aa5a84a1479b78ca9033074" }
pyth-price-publisher = { git = "https://github.com/pyth-network/pyth-crosschain", branch = "add-publisher-program" }
rand = "0.7.0"
rayon = "1.5.3"
regex = "1.5.6"
Expand Down
11 changes: 9 additions & 2 deletions runtime/src/bank/pyth/accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
bank::Bank,
},
byteorder::{LittleEndian, ReadBytesExt},
itertools::Itertools,
log::*,
pyth_oracle::validator::AggregationError,
pythnet_sdk::{
Expand Down Expand Up @@ -441,7 +442,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
v2_messages.push(publisher_stake_caps_message);
}

let new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| {
let mut new_prices = batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| {
warn!("extract_batch_publish_prices failed: {}", err);
HashMap::new()
});
Expand All @@ -458,7 +459,7 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
};

let mut need_save =
batch_publish::apply_published_prices(price_account, &new_prices, bank.slot());
batch_publish::apply_published_prices(price_account, &mut new_prices, bank.slot());

// Perform Accumulation
match pyth_oracle::validator::aggregate_price(
Expand All @@ -483,6 +484,12 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
bank.store_account_and_update_capitalization(&pubkey, &account);
}
}
if !new_prices.is_empty() {
warn!(
"pyth batch publish: missing price feed accounts for indexes: {}",
new_prices.keys().join(", ")
);
}

measure.stop();
debug!(
Expand Down
192 changes: 19 additions & 173 deletions runtime/src/bank/pyth/batch_publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,184 +4,17 @@ use {
accounts_index::{IndexKey, ScanConfig, ScanError},
bank::Bank,
},
log::warn,
log::{info, warn},
pyth_oracle::{
find_publisher_index, get_status_for_conf_price_ratio, solana_program::pubkey::Pubkey,
OracleError, PriceAccount,
},
pyth_price_publisher::accounts::publisher_prices as publisher_prices_account,
solana_sdk::{account::ReadableAccount, clock::Slot},
std::collections::HashMap,
thiserror::Error,
};

// TODO: move to the publish program
#[allow(dead_code)]
pub mod publisher_prices_account {
use {
bytemuck::{cast_slice, from_bytes, from_bytes_mut, Pod, Zeroable},
solana_sdk::clock::Slot,
std::mem::size_of,
thiserror::Error,
};

const FORMAT: u32 = 2848712303;

#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C, packed)]
pub struct PublisherPricesHeader {
pub format: u32,
pub publisher: [u8; 32],
pub slot: Slot,
pub num_prices: u32,
}

impl PublisherPricesHeader {
fn new(publisher: [u8; 32]) -> Self {
PublisherPricesHeader {
format: FORMAT,
publisher,
slot: 0,
num_prices: 0,
}
}
}

#[derive(Debug, Clone, Copy, Zeroable, Pod)]
#[repr(C, packed)]
pub struct PublisherPrice {
// 4 high bits: trading status
// 28 low bits: feed index
pub trading_status_and_feed_index: u32,
pub price: i64,
pub confidence: u64,
}

#[derive(Debug, Error)]
#[error("publisher price data overflow")]
pub struct PublisherPriceError;

impl PublisherPrice {
pub fn new(
feed_index: u32,
trading_status: u32,
price: i64,
confidence: u64,
) -> Result<Self, PublisherPriceError> {
if feed_index >= (1 << 28) || trading_status >= (1 << 4) {
return Err(PublisherPriceError);
}
Ok(Self {
trading_status_and_feed_index: (trading_status << 28) | feed_index,
price,
confidence,
})
}

pub fn trading_status(&self) -> u32 {
self.trading_status_and_feed_index >> 28
}

pub fn feed_index(&self) -> u32 {
self.trading_status_and_feed_index & ((1 << 28) - 1)
}
}

#[derive(Debug, Error)]
pub enum ReadAccountError {
#[error("data too short")]
DataTooShort,
#[error("format mismatch")]
FormatMismatch,
#[error("invalid num prices")]
InvalidNumPrices,
}

#[derive(Debug, Error)]
pub enum ExtendError {
#[error("not enough space")]
NotEnoughSpace,
#[error("invalid length")]
InvalidLength,
}

pub fn read(
data: &[u8],
) -> Result<(&PublisherPricesHeader, &[PublisherPrice]), ReadAccountError> {
if data.len() < size_of::<PublisherPricesHeader>() {
return Err(ReadAccountError::DataTooShort);
}
let header: &PublisherPricesHeader =
from_bytes(&data[..size_of::<PublisherPricesHeader>()]);
if header.format != FORMAT {
return Err(ReadAccountError::FormatMismatch);
}
let prices_bytes = &data[size_of::<PublisherPricesHeader>()..];
let num_prices: usize = header.num_prices.try_into().unwrap();
let expected_len = num_prices.saturating_mul(size_of::<PublisherPrice>());
if expected_len > prices_bytes.len() {
return Err(ReadAccountError::InvalidNumPrices);
}
let prices = cast_slice(&prices_bytes[..expected_len]);
Ok((header, prices))
}

pub fn size(max_prices: usize) -> usize {
size_of::<PublisherPricesHeader>() + max_prices * size_of::<PublisherPrice>()
}

pub fn read_mut(
data: &mut [u8],
) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> {
if data.len() < size_of::<PublisherPricesHeader>() {
return Err(ReadAccountError::DataTooShort);
}
let (header, prices) = data.split_at_mut(size_of::<PublisherPricesHeader>());
let header: &mut PublisherPricesHeader = from_bytes_mut(header);
if header.format != FORMAT {
return Err(ReadAccountError::FormatMismatch);
}
Ok((header, prices))
}

pub fn create(
data: &mut [u8],
publisher: [u8; 32],
) -> Result<(&mut PublisherPricesHeader, &mut [u8]), ReadAccountError> {
if data.len() < size_of::<PublisherPricesHeader>() {
return Err(ReadAccountError::DataTooShort);
}
let (header, prices) = data.split_at_mut(size_of::<PublisherPricesHeader>());
let header: &mut PublisherPricesHeader = from_bytes_mut(header);
*header = PublisherPricesHeader::new(publisher);
Ok((header, prices))
}

pub fn extend(
header: &mut PublisherPricesHeader,
prices: &mut [u8],
new_prices: &[u8],
) -> Result<(), ExtendError> {
if new_prices.len() % size_of::<PublisherPrice>() != 0 {
return Err(ExtendError::InvalidLength);
}
let num_new_prices = (new_prices.len() / size_of::<PublisherPrice>())
.try_into()
.expect("unexpected overflow");
let num_prices: usize = header.num_prices.try_into().unwrap();
let start = size_of::<PublisherPrice>() * num_prices;
let end = size_of::<PublisherPrice>() * num_prices + new_prices.len();
header.num_prices = header
.num_prices
.checked_add(num_new_prices)
.expect("unexpected overflow");
prices
.get_mut(start..end)
.ok_or(ExtendError::NotEnoughSpace)?
.copy_from_slice(new_prices);
Ok(())
}
}

#[derive(Debug, Error)]
pub enum HandleBatchPublishError {
#[error("failed to get program accounts: {0}")]
Expand Down Expand Up @@ -214,14 +47,20 @@ pub fn extract_batch_publish_prices(
.map_err(HandleBatchPublishError::GetProgramAccounts)?;

let mut all_prices = HashMap::<u32, Vec<PublisherPriceValue>>::new();
let mut num_found_accounts = 0;
let mut num_found_prices = 0;
for (account_key, account) in publisher_prices_accounts {
if !publisher_prices_account::format_matches(account.data()) {
continue;
}
let (header, prices) = match publisher_prices_account::read(account.data()) {
Ok(r) => r,
Err(err) => {
warn!("invalid publisher prices account {}: {}", account_key, err);
continue;
}
};
num_found_accounts += 1;
if header.slot != bank.slot() {
// Updates from earlier slots have already been applied.
continue;
Expand All @@ -238,25 +77,32 @@ pub fn extract_batch_publish_prices(
.entry(price.feed_index())
.or_default()
.push(value);
num_found_prices += 1;
}
}
info!(
"pyth batch publish: found {} prices in {} accounts at slot {}",
num_found_prices,
num_found_accounts,
bank.slot()
);
Ok(all_prices)
}

pub fn apply_published_prices(
price_data: &mut PriceAccount,
new_prices: &HashMap<u32, Vec<PublisherPriceValue>>,
new_prices: &mut HashMap<u32, Vec<PublisherPriceValue>>,
slot: Slot,
) -> bool {
if price_data.feed_index == 0 {
return false;
}
let mut any_update = false;
for new_price in new_prices
.get(&price_data.feed_index)
.unwrap_or(&Vec::new())
.remove(&price_data.feed_index)
.unwrap_or_default()
{
match apply_published_price(price_data, new_price, slot) {
match apply_published_price(price_data, &new_price, slot) {
Ok(()) => {
any_update = true;
}
Expand Down
10 changes: 4 additions & 6 deletions runtime/src/bank/pyth/tests/batch_publish_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use {
crate::{
bank::pyth::{
accumulator::{BATCH_PUBLISH_PID, ORACLE_PID},
batch_publish::publisher_prices_account::{self, PublisherPrice},
tests::{create_new_bank_for_tests_with_index, new_from_parent},
},
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
Expand All @@ -11,6 +10,9 @@ use {
pyth_oracle::{
solana_program::account_info::AccountInfo, PriceAccount, PriceAccountFlags, PythAccount,
},
pyth_price_publisher::accounts::publisher_prices::{
self as publisher_prices_account, PublisherPrice,
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount, WritableAccount},
clock::Epoch,
Expand Down Expand Up @@ -40,11 +42,7 @@ fn test_batch_publish() {
let publisher1_key = keypair_from_seed(seed).unwrap();

let (publisher1_prices_key, _bump) = Pubkey::find_program_address(
// TODO: real seed
&[
b"PUBLISHER_PRICES_ACCOUNT",
&publisher1_key.pubkey().to_bytes(),
],
&[b"BUFFER", &publisher1_key.pubkey().to_bytes()],
&BATCH_PUBLISH_PID,
);
let mut publisher1_prices_account =
Expand Down
15 changes: 9 additions & 6 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3214,12 +3214,15 @@ fn process_account_indexes(matches: &ArgMatches) -> AccountSecondaryIndexes {
);
}

if exclude_keys
&& (account_indexes_exclude_keys.contains(&*ORACLE_PID)
|| account_indexes_exclude_keys.contains(&*MESSAGE_BUFFER_PID)
|| account_indexes_exclude_keys.contains(&*BATCH_PUBLISH_PID))
{
panic!("The oracle program id and message buffer program id must *not* be excluded from the account index.");
if exclude_keys {
for key in &[&*ORACLE_PID, &*MESSAGE_BUFFER_PID, &*BATCH_PUBLISH_PID] {
if account_indexes_exclude_keys.contains(key) {
panic!(
"This key must *not* be excluded from the account index: {}",
key
);
}
}
}

let keys = if !account_indexes.is_empty() && (exclude_keys || include_keys) {
Expand Down

0 comments on commit 9ceff43

Please sign in to comment.