Skip to content

Commit

Permalink
feat: batch publish
Browse files Browse the repository at this point in the history
  • Loading branch information
Riateche committed Jul 31, 2024
1 parent d873725 commit 872d1b2
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ num-traits = { version = "0.2" }
num_cpus = "1.13.1"
once_cell = "1.12.0"
ouroboros = "0.15.0"
pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", tag = "oracle-v2.32.1", features = ["library"] }
pyth-oracle = { git = "https://github.com/pyth-network/pyth-client", branch = "batch-publish", features = ["library"] }
pythnet-sdk = { git = "https://github.com/pyth-network/pyth-crosschain", version = "1.13.6", rev = "e670f57f89b05398ca352e4accb1e32724a8e1b4" }
rand = "0.7.0"
rayon = "1.5.3"
Expand Down
1 change: 1 addition & 0 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ mod sysvar_cache;
mod transaction_account_state_info;

pub mod pyth_accumulator;
mod pyth_batch_publish;

#[cfg(test)]
mod pyth_accumulator_tests;
Expand Down
39 changes: 34 additions & 5 deletions runtime/src/bank/pyth_accumulator.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
super::Bank,
super::{pyth_batch_publish, Bank},
crate::accounts_index::{IndexKey, ScanConfig, ScanError},
byteorder::{LittleEndian, ReadBytesExt},
log::*,
Expand All @@ -16,7 +16,10 @@ use {
hash::hashv,
pubkey::Pubkey,
},
std::env::{self, VarError},
std::{
collections::HashMap,
env::{self, VarError},
},
};

pub const ACCUMULATOR_RING_SIZE: u32 = 10_000;
Expand Down Expand Up @@ -44,6 +47,13 @@ lazy_static! {
.parse()
.unwrap(),
);
pub static ref BATCH_PUBLISH_PID: Pubkey = env_pubkey_or(
"BATCH_PUBLISH_PID",
// TODO: replace with real program id
"FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epA"
.parse()
.unwrap(),
);
}

/// Accumulator specific error type. It would be nice to use `transaction::Error` but it does
Expand Down Expand Up @@ -121,6 +131,7 @@ pub fn get_accumulator_keys() -> Vec<(
("ACCUMULATOR_SEQUENCE_ADDR", Ok(*ACCUMULATOR_SEQUENCE_ADDR)),
("WORMHOLE_PID", Ok(*WORMHOLE_PID)),
("ORACLE_PID", Ok(*ORACLE_PID)),
("BATCH_PUBLISH_PID", Ok(*BATCH_PUBLISH_PID)),
]
}

Expand Down Expand Up @@ -413,19 +424,33 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
let mut any_v1_aggregations = false;
let mut v2_messages = Vec::new();

let new_prices = pyth_batch_publish::extract_batch_publish_prices(bank).unwrap_or_else(|err| {
warn!("extract_batch_publish_prices failed: {}", err);
HashMap::new()
});

for (pubkey, mut account) in accounts {
let mut price_account_data = account.data().to_owned();
let price_account = if let Ok(data) =
pyth_oracle::validator::validate_price_account(&mut price_account_data)
{
data
} else {
continue; // Not a price account.
};

let mut need_save =
pyth_batch_publish::apply_published_prices(price_account, &new_prices, bank.slot());

// Perform Accumulation
match pyth_oracle::validator::aggregate_price(
bank.slot(),
bank.clock().unix_timestamp,
&pubkey.to_bytes().into(),
&mut price_account_data,
price_account,
) {
Ok(messages) => {
account.set_data(price_account_data);
bank.store_account_and_update_capitalization(&pubkey, &account);
need_save = true;
v2_messages.extend(messages);
}
Err(err) => match err {
Expand All @@ -435,6 +460,10 @@ pub fn update_v2(bank: &Bank) -> std::result::Result<(), AccumulatorUpdateErrorV
}
},
}
if need_save {
account.set_data(price_account_data);
bank.store_account_and_update_capitalization(&pubkey, &account);
}
}

measure.stop();
Expand Down
135 changes: 133 additions & 2 deletions runtime/src/bank/pyth_accumulator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ use {
AccountIndex, AccountSecondaryIndexes, AccountSecondaryIndexesIncludeExclude,
},
bank::{
pyth_accumulator::{get_accumulator_keys, ACCUMULATOR_RING_SIZE, ORACLE_PID},
pyth_accumulator::{
get_accumulator_keys, ACCUMULATOR_RING_SIZE, BATCH_PUBLISH_PID, ORACLE_PID,
},
pyth_batch_publish::publisher_prices_account::{self, PublisherPrice},
Bank,
},
genesis_utils::{create_genesis_config_with_leader, GenesisConfigInfo},
},
bytemuck::{cast_slice, checked::from_bytes},
byteorder::{ByteOrder, LittleEndian, ReadBytesExt},
itertools::Itertools,
pyth_oracle::{
Expand Down Expand Up @@ -45,7 +49,9 @@ fn create_new_bank_for_tests_with_index(genesis_config: &GenesisConfig) -> Bank
AccountSecondaryIndexes {
keys: Some(AccountSecondaryIndexesIncludeExclude {
exclude: false,
keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID].into_iter().collect(),
keys: [*ORACLE_PID, *MESSAGE_BUFFER_PID, *BATCH_PUBLISH_PID]
.into_iter()
.collect(),
}),
indexes: [AccountIndex::ProgramId].into_iter().collect(),
},
Expand Down Expand Up @@ -899,3 +905,128 @@ fn test_get_accumulator_keys() {
];
assert_eq!(accumulator_keys, expected_pyth_keys);
}

#[test]
fn test_batch_publish() {
let leader_pubkey = solana_sdk::pubkey::new_rand();
let GenesisConfigInfo {
mut genesis_config, ..
} = create_genesis_config_with_leader(5, &leader_pubkey, 3);

// Set epoch length to 32 so we can advance epochs quickly. We also skip past slot 0 here
// due to slot 0 having special handling.
let slots_in_epoch = 32;
genesis_config.epoch_schedule = EpochSchedule::new(slots_in_epoch);
let mut bank = create_new_bank_for_tests_with_index(&genesis_config);

let generate_publisher = |seed, new_prices| {
let publisher1_key = keypair_from_seed(seed).unwrap();

let (publisher1_prices_key, _bump) = Pubkey::find_program_address(
// TODO: real seed
&[
b"PUBLISHER_PRICES_ACCOUNT",
&publisher1_key.pubkey().to_bytes(),
],
&BATCH_PUBLISH_PID,
);
let mut publisher1_prices_account =
AccountSharedData::new(42, publisher_prices_account::size(100), &BATCH_PUBLISH_PID);
{
let (header, prices) = publisher_prices_account::create(
publisher1_prices_account.data_mut(),
publisher1_key.pubkey().to_bytes(),
)
.unwrap();
publisher_prices_account::extend(header, prices, cast_slice(new_prices)).unwrap();
}
bank.store_account(&publisher1_prices_key, &publisher1_prices_account);

publisher1_key
};

let publishers = [
generate_publisher(
&[1u8; 32],
&[
PublisherPrice::new(1, 1, 10, 2).unwrap(),
PublisherPrice::new(2, 1, 20, 3).unwrap(),
],
),
generate_publisher(
&[2u8; 32],
&[
PublisherPrice::new(1, 1, 15, 2).unwrap(),
PublisherPrice::new(2, 1, 25, 3).unwrap(),
],
),
];

let generate_price = |seeds, index| {
let (price_feed_key, _bump) = Pubkey::find_program_address(&[seeds], &ORACLE_PID);
let mut price_feed_account =
AccountSharedData::new(42, size_of::<PriceAccount>(), &ORACLE_PID);

let messages = {
let price_feed_info_key = &price_feed_key.to_bytes().into();
let price_feed_info_lamports = &mut 0;
let price_feed_info_owner = &ORACLE_PID.to_bytes().into();
let price_feed_info_data = price_feed_account.data_mut();
let price_feed_info = AccountInfo::new(
price_feed_info_key,
false,
true,
price_feed_info_lamports,
price_feed_info_data,
price_feed_info_owner,
false,
Epoch::default(),
);

let mut price_account = PriceAccount::initialize(&price_feed_info, 0).unwrap();
price_account.flags.insert(
PriceAccountFlags::ACCUMULATOR_V2 | PriceAccountFlags::MESSAGE_BUFFER_CLEARED,
);
price_account.unused_3_ = index;
price_account.comp_[0].pub_ = publishers[0].pubkey().to_bytes().into();
price_account.comp_[1].pub_ = publishers[1].pubkey().to_bytes().into();
price_account.num_ = 2;
};

bank.store_account(&price_feed_key, &price_feed_account);
(price_feed_key, messages)
};

assert!(bank
.feature_set
.is_active(&feature_set::enable_accumulator_sysvar::id()));
assert!(bank
.feature_set
.is_active(&feature_set::move_accumulator_to_end_of_block::id()));
assert!(bank
.feature_set
.is_active(&feature_set::undo_move_accumulator_to_end_of_block::id()));
assert!(bank
.feature_set
.is_active(&feature_set::redo_move_accumulator_to_end_of_block::id()));

let prices_with_messages = [
generate_price(b"seeds_1", 1),
generate_price(b"seeds_2", 2),
generate_price(b"seeds_3", 3),
generate_price(b"seeds_4", 4),
];

bank = new_from_parent(&Arc::new(bank)); // Advance slot 1.
bank = new_from_parent(&Arc::new(bank)); // Advance slot 2.

let new_price_feed1_account = bank.get_account(&prices_with_messages[0].0).unwrap();
let new_price_feed1_data: &PriceAccount = from_bytes(new_price_feed1_account.data());
assert_eq!(new_price_feed1_data.comp_[0].latest_.price_, 10);
assert_eq!(new_price_feed1_data.comp_[1].latest_.price_, 15);

let new_price_feed2_account = bank.get_account(&prices_with_messages[1].0).unwrap();
let new_price_feed2_data: &PriceAccount = from_bytes(new_price_feed2_account.data());
assert_eq!(new_price_feed2_data.comp_[0].latest_.price_, 20);
assert_eq!(new_price_feed2_data.comp_[1].latest_.price_, 25);
}
Loading

0 comments on commit 872d1b2

Please sign in to comment.