diff --git a/tig-benchmarker/README.md b/tig-benchmarker/README.md new file mode 100644 index 00000000..2a1b1db3 --- /dev/null +++ b/tig-benchmarker/README.md @@ -0,0 +1,13 @@ +# TIG Browser Benchmarker + +This browser benchmarker is deployed to https://play.tig.foundation/benchmarker + +To run it locally, run the following commands before visiting localhost in your browser: + +``` +# uncomment below to install wasm-pack +# curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh +cd tig-benchmarker +wasm-pack build --release --target web +python3 -m http.server 80 +``` \ No newline at end of file diff --git a/tig-benchmarker/index.html b/tig-benchmarker/index.html new file mode 100644 index 00000000..3a6fd92a --- /dev/null +++ b/tig-benchmarker/index.html @@ -0,0 +1,20 @@ + + + + + TIG Benchmarker Test + + +

TIG Browser Benchmarker

+

Instructions:

+
    +
  1. Open the browser console
  2. +
  3. Setup: `await benchmarker.setup(<api_url>, <api_key>, <player_id>)`
  4. +
  5. Start: `await benchmarker.start(<num_workers>)`
  6. +
  7. Stop: `await benchmarker.stop()`
  8. +
  9. Select Algorithm: `await benchmarker.select_algorithm(<challenge_id>, <algorithm_id>)`
  10. +
  11. View State: `await benchmarker.state()`
  12. +
+ + + \ No newline at end of file diff --git a/tig-benchmarker/index.js b/tig-benchmarker/index.js new file mode 100644 index 00000000..67d0e042 --- /dev/null +++ b/tig-benchmarker/index.js @@ -0,0 +1,12 @@ +import init, { setup, state, start, stop, select_algorithm } from './pkg/tig_benchmarker.js'; + +async function loadWasm() { + console.log("Loading Benchmarker WASM"); + await init("./pkg/tig_benchmarker_bg.wasm"); + window.benchmarker = { + setup, state, start, stop, select_algorithm + }; + console.log("Benchmarker WASM initialized and functions are available globally"); +} + +loadWasm(); \ No newline at end of file diff --git a/tig-benchmarker/src/benchmarker.rs b/tig-benchmarker/src/benchmarker.rs deleted file mode 100644 index e8be7b64..00000000 --- a/tig-benchmarker/src/benchmarker.rs +++ /dev/null @@ -1,838 +0,0 @@ -use crate::future_utils::{self, time, Mutex}; -use once_cell::sync::OnceCell; -use rand::{ - distributions::{Alphanumeric, DistString, WeightedIndex}, - rngs::StdRng, - SeedableRng, -}; -use rand_distr::Distribution; -use serde::Serialize; -use std::collections::{HashMap, HashSet}; -use tig_api::*; -use tig_structs::{config::WasmVMConfig, core::*}; -use tig_utils::*; -use tig_worker::{compute_solution, ComputeResult}; - -type Result = std::result::Result; - -#[cfg_attr(feature = "browser", wasm_bindgen::prelude::wasm_bindgen)] -#[derive(Serialize, Clone, Debug)] -pub struct Duration { - pub start: u64, - pub end: u64, - pub now: u64, -} - -#[cfg_attr(feature = "browser", wasm_bindgen::prelude::wasm_bindgen)] -#[derive(Serialize, Debug, Clone)] -pub struct Job { - benchmark_id: String, - settings: BenchmarkSettings, - duration: Duration, - solution_signature_threshold: u32, - nonce_iter: NonceIterator, - wasm_vm_config: WasmVMConfig, -} - -#[cfg_attr(feature = "browser", wasm_bindgen::prelude::wasm_bindgen)] -#[derive(Serialize, Debug, Clone)] -pub struct State { - running: bool, - status: HashMap, - latest_block: Option, - benchmarker_data: Option, - challenges: Vec, - download_urls: HashMap, - algorithms_by_challenge: HashMap>, - selected_algorithms: HashMap, - benchmarks: HashMap, - proofs: HashMap, - frauds: HashMap, - job: Option, -} - -impl State { - pub fn new() -> State { - State { - running: false, - status: HashMap::new(), - latest_block: None, - algorithms_by_challenge: HashMap::new(), - selected_algorithms: HashMap::new(), - benchmarks: HashMap::new(), - proofs: HashMap::new(), - frauds: HashMap::new(), - benchmarker_data: None, - challenges: Vec::new(), - download_urls: HashMap::new(), - job: None, - } - } -} - -#[cfg_attr(feature = "browser", wasm_bindgen::prelude::wasm_bindgen)] -#[derive(Serialize, Debug, Clone)] -pub struct NonceIterator { - nonces: Option>, - current: u32, - attempts: u32, -} - -impl NonceIterator { - pub fn new(nonces: Option>) -> Self { - Self { - nonces, - current: 0, - attempts: 0, - } - } - pub fn attempts(&self) -> u32 { - self.attempts - } - pub fn is_recompute(&self) -> bool { - self.nonces.is_some() - } - pub fn is_finished(&self) -> bool { - self.nonces.as_ref().is_some_and(|x| x.is_empty()) || self.current == u32::MAX - } -} -impl Iterator for NonceIterator { - type Item = u32; - - fn next(&mut self) -> Option { - if let Some(nonces) = self.nonces.as_mut() { - let value = nonces.pop(); - self.attempts += value.is_some() as u32; - value - } else if self.current < u32::MAX { - let value = Some(self.current); - self.attempts += 1; - self.current += 1; - value - } else { - None - } - } -} - -static STATE: OnceCell> = OnceCell::new(); -static BLOBS: OnceCell>>> = OnceCell::new(); -static API: OnceCell = OnceCell::new(); -static PLAYER_ID: OnceCell = OnceCell::new(); -const BLOCK_DATA_POLLER_ID: &'static str = "Block data poller"; -const WORKER_ID: &'static str = "Benchmark worker"; -const MANAGER_ID: &'static str = "Benchmark manager"; -const B_SUBMITTER_ID: &'static str = "Benchmark submitter"; -const P_SUBMITTER_ID: &'static str = "Proof submitter"; - -pub fn mutex() -> &'static Mutex { - STATE.get().unwrap() -} - -pub async fn start() { - let mut state = mutex().lock().await; - (*state).running = true; -} - -pub async fn stop() { - let mut state = mutex().lock().await; - (*state).running = false; -} - -pub async fn select_algorithm(challenge_id: String, algorithm_id: String) { - let mut state = mutex().lock().await; - (*state) - .selected_algorithms - .insert(challenge_id, algorithm_id); -} - -pub async fn setup(api_url: String, api_key: String, player_id: String, num_workers: u32) { - BLOBS.get_or_init(|| Mutex::new(HashMap::new())); - STATE.get_or_init(|| Mutex::new(State::new())); - API.get_or_init(|| Api::new(api_url, api_key)); - PLAYER_ID.get_or_init(|| player_id); - - update_block_data().await.unwrap(); - future_utils::spawn(async { - update_status(BLOCK_DATA_POLLER_ID, "Running").await; - loop { - future_utils::sleep(30000).await; - if let Err(e) = update_block_data().await { - update_status(BLOCK_DATA_POLLER_ID, &e).await; - } - } - }); - - for _ in 0..num_workers { - future_utils::spawn(async { - update_status(WORKER_ID, "Stopped").await; - let mut curr_running: bool = false; - loop { - let next_running = { mutex().lock().await.running }; - if curr_running != next_running { - curr_running = next_running; - if curr_running { - update_status(WORKER_ID, "Starting").await; - } else { - update_status(WORKER_ID, "Stopped").await; - } - } - if !curr_running { - future_utils::sleep(5000).await; - continue; - } - if let Err(e) = do_benchmark().await { - update_status(WORKER_ID, &e.to_string()).await; - } - future_utils::sleep(1000).await; - } - }); - } - future_utils::spawn(async { - update_status(MANAGER_ID, "Stopped").await; - let mut curr_running: bool = false; - loop { - let next_running = { mutex().lock().await.running }; - if curr_running != next_running { - curr_running = next_running; - if curr_running { - update_status(MANAGER_ID, "Starting").await; - } else { - update_status(MANAGER_ID, "Stopped").await; - } - } - if !curr_running { - future_utils::sleep(5000).await; - continue; - } - if let Err(e) = do_manage_benchmark().await { - update_status(MANAGER_ID, &e.to_string()).await; - future_utils::sleep(5000).await; - } - } - }); - future_utils::spawn(async { - update_status(B_SUBMITTER_ID, "Stopped").await; - let mut curr_running: bool = false; - loop { - let next_running = { mutex().lock().await.running }; - if curr_running != next_running { - curr_running = next_running; - if curr_running { - update_status(B_SUBMITTER_ID, "Starting").await; - } else { - update_status(B_SUBMITTER_ID, "Stopped").await; - } - } - if !curr_running { - future_utils::sleep(5000).await; - continue; - } - if let Err(e) = do_submit_benchmark().await { - update_status(B_SUBMITTER_ID, &e.to_string()).await; - } - future_utils::sleep(5000).await; - } - }); - future_utils::spawn(async { - update_status(P_SUBMITTER_ID, "Stopped").await; - let mut curr_running: bool = false; - loop { - let next_running = { mutex().lock().await.running }; - if curr_running != next_running { - curr_running = next_running; - if curr_running { - update_status(P_SUBMITTER_ID, "Starting").await; - } else { - update_status(P_SUBMITTER_ID, "Stopped").await; - } - } - if !curr_running { - future_utils::sleep(5000).await; - continue; - } - if let Err(e) = do_submit_proof().await { - update_status(P_SUBMITTER_ID, &e.to_string()).await; - } - future_utils::sleep(5000).await; - } - }); -} - -async fn get_latest_block_id() -> String { - let state = mutex().lock().await; - state.latest_block.as_ref().unwrap().id.clone() -} - -async fn update_status(id: &str, status: &str) { - let s = format!("[{}]: {}", id, status); - println!("{}", s); - #[cfg(feature = "browser")] - web_sys::console::log_1(&s.into()); - let mut state = mutex().lock().await; - (*state).status.insert(id.to_string(), status.to_string()); -} - -async fn get_latest_block() -> Result { - let GetBlockResp { block, .. } = API - .get() - .unwrap() - .get_block(GetBlockReq { - id: None, - round: None, - height: None, - include_data: false, - }) - .await - .map_err(|e| format!("Failed to get latest block: {:?}", e))?; - Ok(block.unwrap()) -} - -async fn get_benchmarks() -> Result<(Vec, Vec, Vec)> { - let GetBenchmarksResp { - benchmarks, - proofs, - frauds, - .. - } = API - .get() - .unwrap() - .get_benchmarks(GetBenchmarksReq { - block_id: get_latest_block_id().await, - player_id: PLAYER_ID.get().unwrap().clone(), - }) - .await - .map_err(|e| format!("Failed to get benchmarks: {:?}", e))?; - Ok((benchmarks, proofs, frauds)) -} - -async fn get_benchmarker_data() -> Result> { - let GetPlayersResp { players, .. } = API - .get() - .unwrap() - .get_players(GetPlayersReq { - block_id: get_latest_block_id().await, - player_type: PlayerType::Benchmarker, - }) - .await - .map_err(|e| format!("Failed to get players: {:?}", e))?; - Ok(players - .into_iter() - .find(|x| x.id == *PLAYER_ID.get().unwrap()) - .map(|x| x.block_data.unwrap())) -} - -async fn get_challenges() -> Result> { - let GetChallengesResp { challenges, .. } = API - .get() - .unwrap() - .get_challenges(GetChallengesReq { - block_id: get_latest_block_id().await, - }) - .await - .map_err(|e| format!("Failed to get challenges: {:?}", e))?; - Ok(challenges) -} - -async fn get_algorithms() -> Result<(HashMap>, HashMap)> { - let GetAlgorithmsResp { - algorithms, wasms, .. - } = API - .get() - .unwrap() - .get_algorithms(GetAlgorithmsReq { - block_id: get_latest_block_id().await, - }) - .await - .map_err(|e| format!("Failed to get algorithms: {:?}", e))?; - let algorithms_by_challenge: HashMap> = - algorithms.into_iter().fold(HashMap::new(), |mut acc, x| { - acc.entry(x.details.challenge_id.clone()) - .or_default() - .push(x.clone()); - acc - }); - let download_urls = wasms - .into_iter() - .filter(|x| x.details.download_url.is_some()) - .map(|x| (x.algorithm_id, x.details.download_url.unwrap())) - .collect(); - Ok((algorithms_by_challenge, download_urls)) -} - -async fn update_block_data() -> Result<()> { - let block = get_latest_block().await?; - { - let mut state = mutex().lock().await; - (*state).latest_block = Some(block.clone()); - } - let results = future_utils::join( - get_algorithms(), - get_benchmarker_data(), - get_benchmarks(), - get_challenges(), - ) - .await?; - let mut state = mutex().lock().await; - let (algorithms_by_challenge, download_urls) = results.0?; - (*state).algorithms_by_challenge = algorithms_by_challenge; - (*state).download_urls = download_urls; - - let benchmarker_data = results.1?; - (*state).benchmarker_data = benchmarker_data; - - let (benchmarks, proofs, frauds) = results.2?; - (*state).benchmarks.retain(|_, x| { - x.details.block_started - >= block - .details - .height - .saturating_sub(block.config().benchmark_submissions.lifespan_period) - }); - let keys_to_keep: HashSet = state.benchmarks.keys().cloned().collect(); - (*state) - .proofs - .retain(|_, x| keys_to_keep.contains(&x.benchmark_id)); - (*state) - .frauds - .retain(|_, x| keys_to_keep.contains(&x.benchmark_id)); - for x in benchmarks { - (*state).benchmarks.insert(x.id.clone(), x); - } - for x in proofs { - (*state).proofs.insert(x.benchmark_id.clone(), x); - } - for x in frauds { - (*state).frauds.insert(x.benchmark_id.clone(), x); - } - - let challenges = results.3?; - (*state).challenges = challenges; - Ok(()) -} - -async fn find_settings_to_recompute() -> Option { - let state = mutex().lock().await; - for (benchmark_id, benchmark) in state.benchmarks.iter() { - if !state.proofs.contains_key(benchmark_id) && benchmark.state.is_some() { - let sampled_nonces = benchmark.state().sampled_nonces.clone().unwrap(); - return Some(Job { - benchmark_id: benchmark.id.clone(), - settings: benchmark.settings.clone(), - duration: Duration { - start: time(), - end: time() + 5000, - now: time(), - }, - solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud - nonce_iter: NonceIterator::new(Some(sampled_nonces)), - wasm_vm_config: WasmVMConfig { - max_memory: u64::MAX, - max_fuel: u64::MAX, - }, - }); - } - } - None -} - -async fn pick_settings_to_benchmark() -> Job { - let block_id = get_latest_block_id().await; - let state = mutex().lock().await; - let num_qualifiers_by_challenge = match &state.benchmarker_data { - Some(data) => data.num_qualifiers_by_challenge.clone().unwrap(), - None => HashMap::new(), - }; - let percent_qualifiers_by_challenge: HashMap = state - .challenges - .iter() - .map(|c| { - let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0); - let challenge_num_qualifiers = *c.block_data().num_qualifiers(); - let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 { - 0f64 - } else { - (player_num_qualifiers as f64) / (challenge_num_qualifiers as f64) - }; - (c.id.clone(), percent) - }) - .collect(); - let mut rng = StdRng::seed_from_u64(time() as u64); - let challenge_weights: Vec<(String, f64)> = state - .selected_algorithms - .keys() - .map(|challenge_id| { - ( - challenge_id.clone(), - 1f64 - percent_qualifiers_by_challenge[challenge_id] + 1e-10f64, - ) - }) - .collect(); - let dist = WeightedIndex::new( - &challenge_weights - .iter() - .map(|w| w.1.clone()) - .collect::>(), - ) - .unwrap(); - let index = dist.sample(&mut rng); - - let random_challenge_id = challenge_weights[index].0.clone(); - let selected_algorithm_id = state - .selected_algorithms - .get(&random_challenge_id) - .unwrap() - .clone(); - - let challenge = state - .challenges - .iter() - .find(|c| c.id == random_challenge_id) - .unwrap(); - let min_difficulty = challenge.details.min_difficulty(); - let max_difficulty = challenge.details.max_difficulty(); - let block_data = &challenge.block_data(); - let random_difficulty = block_data.base_frontier().sample(&mut rng).scale( - &min_difficulty, - &max_difficulty, - *block_data.scaling_factor(), - ); - Job { - benchmark_id: Alphanumeric.sample_string(&mut rng, 32), - settings: BenchmarkSettings { - player_id: PLAYER_ID.get().unwrap().clone(), - block_id, - challenge_id: random_challenge_id, - algorithm_id: selected_algorithm_id, - difficulty: random_difficulty, - }, - duration: Duration { - start: time(), - end: time() + 30000, - now: time(), - }, - solution_signature_threshold: *block_data.solution_signature_threshold(), - nonce_iter: NonceIterator::new(None), - wasm_vm_config: state - .latest_block - .as_ref() - .unwrap() - .config() - .wasm_vm - .clone(), - } -} - -async fn download_wasm_blob(algorithm_id: &String) -> Result> { - // lock it so that only do 1 download - let mut blobs = BLOBS.get().unwrap().lock().await; - if let Some(wasm_blob) = blobs.get(algorithm_id) { - Ok(wasm_blob.clone()) - } else { - let state = mutex().lock().await; - let download_url = state - .download_urls - .get(algorithm_id.as_str()) - .ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))?; - let wasm = get::>(download_url.as_str(), None) - .await - .map_err(|e| format!("Failed to download wasm from {}: {:?}", download_url, e))?; - (*blobs).insert(algorithm_id.clone(), wasm.clone()); - Ok(wasm) - } -} - -async fn do_benchmark() -> Result<()> { - let mut last_algorithm_id = "None".to_string(); - let mut blob = Vec::new(); - while let Some((job, Some(nonce))) = { - let mut state = mutex().lock().await; - if state.running { - (*state) - .job - .as_mut() - .map(|x| (x.clone(), x.nonce_iter.next())) - } else { - None - } - } { - if last_algorithm_id != job.settings.algorithm_id { - blob = download_wasm_blob(&job.settings.algorithm_id).await?; - last_algorithm_id = job.settings.algorithm_id.clone(); - } - if let Ok(ComputeResult::ValidSolution(solution_data)) = compute_solution( - &job.settings, - nonce, - blob.as_slice(), - job.wasm_vm_config.max_memory, - job.wasm_vm_config.max_fuel, - ) { - if solution_data.calc_solution_signature() <= job.solution_signature_threshold { - let mut state = mutex().lock().await; - if let Some(Some(solutions_meta_data)) = (*state) - .benchmarks - .get_mut(&job.benchmark_id) - .map(|x| x.solutions_meta_data.as_mut()) - { - solutions_meta_data.push(solution_data.clone().into()); - } - if let Some(Some(solutions_data)) = (*state) - .proofs - .get_mut(&job.benchmark_id) - .map(|x| x.solutions_data.as_mut()) - { - solutions_data.push(solution_data); - if !job.nonce_iter.is_recompute() { - (*state) - .benchmarks - .get_mut(&job.benchmark_id) - .unwrap() - .details - .num_solutions += 1; - } - } else { - return Ok(()); - } - } - } - future_utils::sleep(1).await; - } - update_status(WORKER_ID, "Finished job").await; - Ok(()) -} - -async fn do_manage_benchmark() -> Result<()> { - update_status(MANAGER_ID, "Checking for any benchmarks to recompute").await; - let job = if let Some(x) = find_settings_to_recompute().await { - update_status(MANAGER_ID, "Found benchmark to recompute").await; - x - } else { - update_status(MANAGER_ID, "Picking new settings to benchmark").await; - pick_settings_to_benchmark().await - }; - update_status(MANAGER_ID, &format!("{:?}", job.settings)).await; - - update_status( - MANAGER_ID, - &format!("Downloading algorithm: {}", job.settings.algorithm_id), - ) - .await; - download_wasm_blob(&job.settings.algorithm_id).await?; - - update_status(MANAGER_ID, &format!("Setting up benchmark")).await; - let benchmark_id = job.benchmark_id.clone(); - let mut state = mutex().lock().await; - if !job.nonce_iter.is_recompute() { - let block_started = state.latest_block.as_ref().unwrap().details.height.clone(); - (*state).benchmarks.insert( - benchmark_id.clone(), - Benchmark { - id: benchmark_id.clone(), - settings: job.settings.clone(), - details: BenchmarkDetails { - block_started, - num_solutions: 0, - }, - state: None, - solutions_meta_data: Some(Vec::new()), - solution_data: None, - }, - ); - } - (*state).proofs.insert( - benchmark_id.clone(), - Proof { - benchmark_id: benchmark_id.clone(), - state: None, - solutions_data: Some(Vec::new()), - }, - ); - (*state).job = Some(job); - drop(state); - - loop { - let mut state = mutex().lock().await; - if !state.running { - break; - } - let job = (*state).job.as_mut().unwrap(); - job.duration.now = time(); - if job.duration.now > job.duration.end { - break; - } - - if job.nonce_iter.is_finished() { - break; - } - let num_attempts = job.nonce_iter.attempts(); - let num_solutions = state - .proofs - .get(&benchmark_id) - .unwrap() - .solutions_data() - .len(); - drop(state); - update_status( - MANAGER_ID, - &format!( - "Computed {} solutions out of {} instances", - num_solutions, num_attempts - ), - ) - .await; - - future_utils::sleep(200).await; - } - - let mut state = mutex().lock().await; - let num_solutions = state - .proofs - .get(&benchmark_id) - .unwrap() - .solutions_data() - .len(); - let num_attempts = state.job.as_ref().unwrap().nonce_iter.attempts(); - (*state).job = None; - if num_solutions == 0 { - (*state).benchmarks.remove(&benchmark_id); - (*state).proofs.remove(&benchmark_id); - } - drop(state); - update_status( - MANAGER_ID, - &format!( - "Finished. Computed {} solutions out of {} instances", - num_solutions, num_attempts - ), - ) - .await; - Ok(()) -} - -async fn do_submit_benchmark() -> Result<()> { - update_status(B_SUBMITTER_ID, "Finding benchmark to submit").await; - let benchmark_to_submit = { - let mut state = mutex().lock().await; - let State { - ref mut benchmarks, - ref proofs, - ref job, - .. - } = *state; - let job_benchmark_id = job.as_ref().map(|x| &x.benchmark_id); - benchmarks - .values_mut() - .find(|benchmark| { - job_benchmark_id != Some(&benchmark.id) && benchmark.solutions_meta_data.is_some() - }) - .map(|benchmark| { - ( - benchmark.id.clone(), - benchmark.settings.clone(), - benchmark.solutions_meta_data.take().unwrap(), - proofs - .get(&benchmark.id) - .unwrap() - .solutions_data() - .first() - .unwrap() - .clone(), - ) - }) - }; - if let Some((old_benchmark_id, settings, solutions_meta_data, solution_data)) = - benchmark_to_submit - { - update_status( - B_SUBMITTER_ID, - &format!("Submitting benchmark: {:?}", settings), - ) - .await; - let resp = API - .get() - .unwrap() - .submit_benchmark(SubmitBenchmarkReq { - settings, - solutions_meta_data, - solution_data, - }) - .await - .map_err(|e| format!("Failed to submit benchmark: {:?}", e))?; - update_status(B_SUBMITTER_ID, &format!("{:?}", resp)).await; - if resp.benchmark_id != old_benchmark_id { - let mut state = mutex().lock().await; - let mut benchmark = (*state).benchmarks.remove(&old_benchmark_id).unwrap(); - let mut proof = (*state).proofs.remove(&old_benchmark_id).unwrap(); - benchmark.id = resp.benchmark_id.clone(); - proof.benchmark_id = resp.benchmark_id.clone(); - (*state) - .benchmarks - .insert(resp.benchmark_id.clone(), benchmark); - (*state).proofs.insert(resp.benchmark_id.clone(), proof); - } - } else { - update_status(B_SUBMITTER_ID, "No benchmark to submit").await; - } - Ok(()) -} - -async fn do_submit_proof() -> Result<()> { - update_status(P_SUBMITTER_ID, "Finding proof to submit").await; - let proof_to_submit = { - let mut state = mutex().lock().await; - let State { - ref benchmarks, - ref mut proofs, - ref job, - .. - } = *state; - let job_benchmark_id = job.as_ref().map(|x| &x.benchmark_id); - proofs - .values_mut() - .find(|x| { - job_benchmark_id != Some(&x.benchmark_id) - && x.solutions_data.is_some() - && benchmarks - .get(&x.benchmark_id) - .is_some_and(|x| x.state.is_some()) - }) - .map(|x| { - let state = benchmarks - .get(&x.benchmark_id) - .unwrap() - .state - .as_ref() - .unwrap(); - let sampled_nonces: HashSet = - state.sampled_nonces.clone().unwrap().into_iter().collect(); - let mut solutions_data = x.solutions_data.take().unwrap(); - solutions_data.retain(|x| sampled_nonces.contains(&x.nonce)); - (x.benchmark_id.clone(), solutions_data) - }) - }; - if let Some((benchmark_id, solutions_data)) = proof_to_submit { - update_status( - P_SUBMITTER_ID, - &format!("Submitting proof for benchmark {}", benchmark_id), - ) - .await; - let resp = API - .get() - .unwrap() - .submit_proof(SubmitProofReq { - benchmark_id: benchmark_id.clone(), - solutions_data: solutions_data.into(), - }) - .await - .map_err(|e| { - format!( - "Failed to submit proof for benchmark {}: {:?}", - benchmark_id, e - ) - })?; - update_status(P_SUBMITTER_ID, &format!("{:?}", resp)).await; - } else { - update_status(P_SUBMITTER_ID, "No proof to submit").await; - } - Ok(()) -} diff --git a/tig-benchmarker/src/benchmarker/download_wasm.rs b/tig-benchmarker/src/benchmarker/download_wasm.rs new file mode 100644 index 00000000..1205d2b3 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/download_wasm.rs @@ -0,0 +1,15 @@ +use super::{blobs, Job, Result}; +use tig_utils::get; + +pub async fn execute(job: &Job) -> Result> { + let mut blobs = blobs().lock().await; + if let Some(wasm_blob) = blobs.get(&job.settings.algorithm_id) { + Ok(wasm_blob.clone()) + } else { + let wasm = get::>(&job.download_url, None) + .await + .map_err(|e| format!("Failed to download wasm from {}: {:?}", job.download_url, e))?; + (*blobs).insert(job.settings.algorithm_id.clone(), wasm.clone()); + Ok(wasm) + } +} diff --git a/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs b/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs new file mode 100644 index 00000000..7ea1dbb6 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/find_proof_to_submit.rs @@ -0,0 +1,34 @@ +use super::{state, QueryData, Result}; +use std::collections::HashSet; +use tig_worker::SolutionData; + +pub async fn execute() -> Result)>> { + let QueryData { + proofs, + benchmarks, + frauds, + .. + } = &mut state().lock().await.query_data; + for (benchmark_id, proof) in proofs.iter_mut() { + if proof.solutions_data.is_none() || frauds.contains_key(benchmark_id) { + continue; + } + if let Some(state) = &benchmarks[benchmark_id].state { + let sampled_nonces: HashSet = + state.sampled_nonces.clone().unwrap().into_iter().collect(); + let mut solutions_data = proof.solutions_data.take().unwrap(); + solutions_data.retain(|x| sampled_nonces.contains(&x.nonce)); + let extracted_nonces: HashSet = solutions_data.iter().map(|x| x.nonce).collect(); + if extracted_nonces != sampled_nonces { + return Err(format!( + "No solutions for sampled nonces: '{:?}'", + sampled_nonces + .difference(&extracted_nonces) + .collect::>() + )); + } + return Ok(Some((benchmark_id.clone(), solutions_data))); + } + } + Ok(None) +} diff --git a/tig-benchmarker/src/benchmarker/mod.rs b/tig-benchmarker/src/benchmarker/mod.rs new file mode 100644 index 00000000..6a5eb93d --- /dev/null +++ b/tig-benchmarker/src/benchmarker/mod.rs @@ -0,0 +1,377 @@ +mod download_wasm; +mod find_proof_to_submit; +mod query_data; +mod run_benchmark; +mod setup_job; +mod submit_benchmark; +mod submit_proof; + +use crate::future_utils::{sleep, spawn, time, Mutex}; +use once_cell::sync::OnceCell; +use serde::Serialize; +use std::{collections::HashMap, sync::Arc}; +use tig_api::Api; +use tig_structs::{config::WasmVMConfig, core::*}; + +pub type Result = std::result::Result; + +#[derive(Serialize, Clone, Debug)] +pub struct QueryData { + pub latest_block: Block, + pub player_data: Option, + pub challenges: Vec, + pub download_urls: HashMap, + pub algorithms_by_challenge: HashMap>, + pub benchmarks: HashMap, + pub proofs: HashMap, + pub frauds: HashMap, +} + +#[derive(Serialize, Clone, Debug)] +pub struct Duration { + pub start: u64, + pub end: u64, + pub now: u64, +} + +#[derive(Serialize, Debug, Clone)] +pub struct Job { + download_url: String, + benchmark_id: String, + settings: BenchmarkSettings, + solution_signature_threshold: u32, + sampled_nonces: Option>, + wasm_vm_config: WasmVMConfig, +} + +#[derive(Serialize, Debug, Clone)] +pub struct NonceIterator { + nonces: Option>, + current: u32, + attempts: u32, +} + +impl NonceIterator { + pub fn new(nonces: Option>) -> Self { + Self { + nonces, + current: 0, + attempts: 0, + } + } + pub fn attempts(&self) -> u32 { + self.attempts + } + pub fn is_empty(&self) -> bool { + self.nonces.as_ref().is_some_and(|x| x.is_empty()) || self.current == u32::MAX + } + pub fn empty(&mut self) { + if let Some(nonces) = self.nonces.as_mut() { + nonces.clear(); + } + self.current = u32::MAX; + } +} +impl Iterator for NonceIterator { + type Item = u32; + + fn next(&mut self) -> Option { + if let Some(nonces) = self.nonces.as_mut() { + let value = nonces.pop(); + self.attempts += value.is_some() as u32; + value + } else if self.current < u32::MAX { + let value = Some(self.current); + self.attempts += 1; + self.current += 1; + value + } else { + None + } + } +} + +#[derive(Serialize, Debug, Clone, PartialEq)] +pub enum Status { + Starting, + Running(String), + Stopping, + Stopped, +} +#[derive(Serialize, Debug, Clone)] +pub struct State { + pub status: Status, + pub query_data: QueryData, + pub selected_algorithms: HashMap, + pub job: Option, + pub submission_errors: HashMap, +} + +static BLOBS: OnceCell>>> = OnceCell::new(); +static STATE: OnceCell> = OnceCell::new(); +static API: OnceCell = OnceCell::new(); +static PLAYER_ID: OnceCell = OnceCell::new(); + +pub fn api() -> &'static Api { + API.get().expect("API should be initialised") +} + +pub fn blobs() -> &'static Mutex>> { + BLOBS.get().expect("BLOBS should be initialised") +} + +pub fn player_id() -> &'static String { + PLAYER_ID.get().expect("PLAYER_ID should be initialised") +} + +pub fn state() -> &'static Mutex { + STATE.get().expect("STATE should be initialised") +} + +async fn update_status(status: &str) { + let mut state = state().lock().await; + if let Status::Running(_) = state.status { + state.status = Status::Running(status.to_string()); + println!("{}", status); + #[cfg(feature = "browser")] + web_sys::console::log_1(&status.to_string().into()); + } +} + +async fn run_once(num_workers: u32) -> Result<()> { + update_status("Querying latest data").await; + // retain only benchmarks that are within the lifespan period + // preserves solution_meta_data and solution_data + let mut new_query_data = query_data::execute().await?; + { + let mut state = (*state()).lock().await; + let block_started_cutoff = new_query_data.latest_block.details.height.saturating_sub( + new_query_data + .latest_block + .config() + .benchmark_submissions + .lifespan_period, + ); + let mut latest_benchmarks = state.query_data.benchmarks.clone(); + latest_benchmarks.retain(|_, x| x.details.block_started >= block_started_cutoff); + latest_benchmarks.extend(new_query_data.benchmarks.drain()); + + let mut latest_proofs = state.query_data.proofs.clone(); + latest_proofs.retain(|id, _| latest_benchmarks.contains_key(id)); + latest_proofs.extend(new_query_data.proofs.drain()); + + let mut latest_frauds = state.query_data.frauds.clone(); + latest_frauds.retain(|id, _| latest_benchmarks.contains_key(id)); + latest_frauds.extend(new_query_data.frauds.drain()); + + (*state) + .submission_errors + .retain(|id, _| latest_benchmarks.contains_key(id)); + new_query_data.benchmarks = latest_benchmarks; + new_query_data.proofs = latest_proofs; + new_query_data.frauds = latest_frauds; + (*state).query_data = new_query_data; + } + + update_status("Finding proof to submit").await; + match find_proof_to_submit::execute().await? { + Some((benchmark_id, solutions_data)) => { + update_status(&format!("Submitting proof for {}", benchmark_id)).await; + if let Err(e) = submit_proof::execute(benchmark_id.clone(), solutions_data).await { + let mut state = state().lock().await; + state.submission_errors.insert(benchmark_id, e.clone()); + return Err(e); + } + update_status(&format!("Success. Proof {} submitted", benchmark_id)).await; + } + None => { + update_status("No proof to submit").await; + } + } + + // creates a benchmark & proof with job.benchmark_id + update_status("Selecting settings to benchmark").await; + setup_job::execute().await?; + let job = { + let state = state().lock().await; + state.job.clone().unwrap() + }; + update_status(&format!("{:?}", job.settings)).await; + + update_status(&format!( + "Downloading algorithm {}", + job.download_url.split("/").last().unwrap() + )) + .await; + let wasm = download_wasm::execute(&job).await?; + + // variables that are shared by workers + let nonce_iter = Arc::new(Mutex::new(NonceIterator::new(job.sampled_nonces.clone()))); + let solutions_data = Arc::new(Mutex::new(Vec::::new())); + update_status("Starting benchmark").await; + run_benchmark::execute( + num_workers, + &job, + &wasm, + nonce_iter.clone(), + solutions_data.clone(), + ) + .await; + let start = time(); + loop { + let now = time(); + { + // transfers solutions computed by workers to benchmark state + let num_solutions = + drain_solutions(job.benchmark_id.clone(), solutions_data.clone()).await; + let nonce_iter = (*nonce_iter).lock().await; + update_status(&format!( + "Computed {} solutions out of {} instances", + num_solutions, + nonce_iter.attempts() + )) + .await; + if now - start >= 15000 || nonce_iter.is_empty() { + break; + } + } + sleep(200).await; + } + { + // workers exit when iter returns None + (*(*nonce_iter).lock().await).empty(); + }; + + // transfers solutions computed by workers to benchmark state + let num_solutions = drain_solutions(job.benchmark_id.clone(), solutions_data.clone()).await; + if let Some(sampled_nonces) = job.sampled_nonces.as_ref() { + if num_solutions != sampled_nonces.len() as u32 { + let mut state = (*state()).lock().await; + (*state) + .query_data + .proofs + .get_mut(&job.benchmark_id) + .unwrap() + .solutions_data + .take(); + return Err(format!( + "Failed to recompute solutions for {}", + job.benchmark_id + )); + } else { + update_status(&format!( + "Finished. Recompute solutions for {}", + job.benchmark_id + )) + .await; + } + } else { + if num_solutions == 0 { + update_status("Finished. No solutions to submit").await; + } else { + update_status(&format!("Finished. Submitting {} solutions", num_solutions,)).await; + let benchmark_id = match submit_benchmark::execute(&job).await { + Ok(benchmark_id) => benchmark_id, + Err(e) => { + let mut state = (*state()).lock().await; + state + .submission_errors + .insert(job.benchmark_id.clone(), e.clone()); + return Err(e); + } + }; + update_status(&format!("Success. Benchmark {} submitted", benchmark_id)).await; + let mut state = (*state()).lock().await; + let QueryData { + benchmarks, proofs, .. + } = &mut (*state).query_data; + let mut benchmark = benchmarks.remove(&job.benchmark_id).unwrap(); + let mut proof = proofs.remove(&job.benchmark_id).unwrap(); + benchmark.id = benchmark_id.clone(); + proof.benchmark_id = benchmark_id.clone(); + benchmarks.insert(benchmark_id.clone(), benchmark); + proofs.insert(benchmark_id.clone(), proof); + } + } + Ok(()) +} + +async fn drain_solutions( + benchmark_id: String, + solutions_data: Arc>>, +) -> u32 { + let mut solutions_data = (*solutions_data).lock().await; + let mut state = (*state()).lock().await; + let QueryData { + benchmarks, proofs, .. + } = &mut (*state).query_data; + let benchmark = benchmarks.get_mut(&benchmark_id).unwrap(); + let proof = proofs.get_mut(&benchmark_id).unwrap(); + if let Some(x) = benchmark.solutions_meta_data.as_mut() { + x.extend( + solutions_data + .iter() + .map(|x| SolutionMetaData::from(x.clone())), + ); + benchmark.details.num_solutions = x.len() as u32; + } + let to_update = proof.solutions_data.as_mut().unwrap(); + to_update.extend((*solutions_data).drain(..)); + to_update.len() as u32 +} +pub async fn start(num_workers: u32) { + { + let mut state = (*state()).lock().await; + if state.status != Status::Stopped { + return; + } + state.status = Status::Starting; + } + spawn(async move { + { + let mut state = (*state()).lock().await; + state.status = Status::Running("Starting".to_string()); + } + loop { + { + let mut state = (*state()).lock().await; + if state.status == Status::Stopping { + state.status = Status::Stopped; + } + } + if let Err(e) = run_once(num_workers).await { + update_status(&format!("Error: {:?}", e)).await; + sleep(5000).await; + } + } + }); +} +pub async fn stop() { + let mut state = (*state()).lock().await; + match state.status { + Status::Running(_) => { + state.status = Status::Stopping; + } + _ => {} + } +} +pub async fn select_algorithm(challenge_id: String, algorithm_id: String) { + let mut state = (*state()).lock().await; + state.selected_algorithms.insert(challenge_id, algorithm_id); +} + +pub async fn setup(api_url: String, api_key: String, player_id: String) { + BLOBS.get_or_init(|| Mutex::new(HashMap::new())); + API.get_or_init(|| Api::new(api_url, api_key)); + PLAYER_ID.get_or_init(|| player_id); + let query_data = query_data::execute().await.expect("Failed to query data"); + STATE.get_or_init(|| { + Mutex::new(State { + status: Status::Stopped, + query_data, + selected_algorithms: HashMap::new(), + job: None, + submission_errors: HashMap::new(), + }) + }); +} diff --git a/tig-benchmarker/src/benchmarker/query_data.rs b/tig-benchmarker/src/benchmarker/query_data.rs new file mode 100644 index 00000000..7d797406 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/query_data.rs @@ -0,0 +1,131 @@ +use std::collections::HashMap; + +use super::{api, player_id, QueryData, Result}; +use crate::future_utils; +use tig_api::*; +use tig_structs::core::*; + +pub async fn execute() -> Result { + let latest_block = query_latest_block().await?; + let results = future_utils::join( + query_algorithms(latest_block.id.clone()), + query_player_data(latest_block.id.clone()), + query_benchmarks(latest_block.id.clone()), + query_challenges(latest_block.id.clone()), + ) + .await?; + let (algorithms_by_challenge, download_urls) = results.0?; + let player_data = results.1?; + let (benchmarks, proofs, frauds) = results.2?; + let challenges = results.3?; + Ok(QueryData { + latest_block, + algorithms_by_challenge, + player_data, + download_urls, + benchmarks, + proofs, + frauds, + challenges, + }) +} + +async fn query_latest_block() -> Result { + let GetBlockResp { block, .. } = api() + .get_block(GetBlockReq { + id: None, + round: None, + height: None, + include_data: false, + }) + .await + .map_err(|e| format!("Failed to query latest block: {:?}", e))?; + Ok(block.ok_or_else(|| format!("Expecting latest block to exist"))?) +} + +async fn query_benchmarks( + block_id: String, +) -> Result<( + HashMap, + HashMap, + HashMap, +)> { + let GetBenchmarksResp { + benchmarks, + proofs, + frauds, + .. + } = api() + .get_benchmarks(GetBenchmarksReq { + block_id: block_id.clone(), + player_id: player_id().clone(), + }) + .await + .map_err(|e| format!("Failed to get benchmarks: {:?}", e))?; + Ok(( + benchmarks.into_iter().map(|x| (x.id.clone(), x)).collect(), + proofs + .into_iter() + .map(|x| (x.benchmark_id.clone(), x)) + .collect(), + frauds + .into_iter() + .map(|x| (x.benchmark_id.clone(), x)) + .collect(), + )) +} + +async fn query_player_data(block_id: String) -> Result> { + let GetPlayersResp { players, .. } = api() + .get_players(GetPlayersReq { + block_id: block_id.clone(), + player_type: PlayerType::Benchmarker, + }) + .await + .map_err(|e| format!("Failed to query players: {:?}", e))?; + let player_id = player_id().clone(); + match players.into_iter().find(|x| x.id == player_id) { + Some(player) => { + Ok(Some(player.block_data.ok_or_else(|| { + format!("Expecting player to have block_data") + })?)) + } + None => Ok(None), + } +} + +async fn query_challenges(block_id: String) -> Result> { + let GetChallengesResp { challenges, .. } = api() + .get_challenges(GetChallengesReq { + block_id: block_id.clone(), + }) + .await + .map_err(|e| format!("Failed to query challenges: {:?}", e))?; + Ok(challenges) +} + +async fn query_algorithms( + block_id: String, +) -> Result<(HashMap>, HashMap)> { + let GetAlgorithmsResp { + algorithms, wasms, .. + } = api() + .get_algorithms(GetAlgorithmsReq { + block_id: block_id.clone(), + }) + .await + .map_err(|e| format!("Failed to query algorithms: {:?}", e))?; + let algorithms_by_challenge: HashMap> = + algorithms.into_iter().fold(HashMap::new(), |mut acc, x| { + acc.entry(x.details.challenge_id.clone()) + .or_default() + .push(x.clone()); + acc + }); + let download_urls = wasms + .into_iter() + .filter(|x| x.details.download_url.is_some()) + .map(|x| (x.algorithm_id, x.details.download_url.unwrap())) + .collect(); + Ok((algorithms_by_challenge, download_urls)) +} diff --git a/tig-benchmarker/src/benchmarker/run_benchmark.rs b/tig-benchmarker/src/benchmarker/run_benchmark.rs new file mode 100644 index 00000000..4eaf8d12 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/run_benchmark.rs @@ -0,0 +1,47 @@ +use super::{Job, NonceIterator}; +use crate::future_utils; +use future_utils::{sleep, spawn, Mutex}; +use std::sync::Arc; +use tig_worker::{compute_solution, ComputeResult, SolutionData}; + +pub async fn execute( + num_workers: u32, + job: &Job, + wasm: &Vec, + nonce_iter: Arc>, + solutions_data: Arc>>, +) { + for _ in 0..num_workers { + let job = job.clone(); + let wasm = wasm.clone(); + let nonce_iter = nonce_iter.clone(); + let solutions_data = solutions_data.clone(); + spawn(async move { + loop { + match { + let mut nonce_iter = (*nonce_iter).lock().await; + (*nonce_iter).next() + } { + None => break, + Some(nonce) => { + if let Ok(ComputeResult::ValidSolution(solution_data)) = compute_solution( + &job.settings, + nonce, + wasm.as_slice(), + job.wasm_vm_config.max_memory, + job.wasm_vm_config.max_fuel, + ) { + if solution_data.calc_solution_signature() + <= job.solution_signature_threshold + { + let mut solutions_data = (*solutions_data).lock().await; + (*solutions_data).push(solution_data); + } + } + sleep(1).await; + } + } + } + }) + } +} diff --git a/tig-benchmarker/src/benchmarker/setup_job.rs b/tig-benchmarker/src/benchmarker/setup_job.rs new file mode 100644 index 00000000..93c6bc4b --- /dev/null +++ b/tig-benchmarker/src/benchmarker/setup_job.rs @@ -0,0 +1,192 @@ +use super::{player_id, state, Job, QueryData, Result, State}; +use crate::future_utils::time; +use rand::{ + distributions::{Alphanumeric, DistString, WeightedIndex}, + rngs::StdRng, + SeedableRng, +}; +use rand_distr::Distribution; +use std::collections::HashMap; +use tig_structs::{config::WasmVMConfig, core::*}; +use tig_utils::{FrontierOps, PointOps}; + +pub async fn execute() -> Result<()> { + let job = if let Some(x) = find_settings_to_recompute().await? { + x + } else { + pick_settings_to_benchmark().await? + }; + let mut state = state().lock().await; + (*state).job.replace(job.clone()); + let QueryData { + latest_block, + benchmarks, + proofs, + .. + } = &mut state.query_data; + if job.sampled_nonces.is_none() { + benchmarks.insert( + job.benchmark_id.clone(), + Benchmark { + id: job.benchmark_id.clone(), + settings: job.settings.clone(), + details: BenchmarkDetails { + block_started: latest_block.details.height.clone(), + num_solutions: 0, + }, + state: None, + solutions_meta_data: Some(Vec::new()), + solution_data: None, + }, + ); + } + proofs.insert( + job.benchmark_id.clone(), + Proof { + benchmark_id: job.benchmark_id.clone(), + state: None, + solutions_data: Some(Vec::new()), + }, + ); + Ok(()) +} + +async fn find_settings_to_recompute() -> Result> { + let QueryData { + benchmarks, + proofs, + frauds, + download_urls, + .. + } = &state().lock().await.query_data; + for (benchmark_id, benchmark) in benchmarks.iter() { + if !frauds.contains_key(benchmark_id) + && !proofs.contains_key(benchmark_id) + && benchmark.state.is_some() + { + let sampled_nonces = benchmark.state().sampled_nonces.clone().ok_or_else(|| { + format!( + "Expecting benchmark '{}' to have sampled_nonces", + benchmark_id + ) + })?; + return Ok(Some(Job { + benchmark_id: benchmark.id.clone(), + download_url: get_download_url(&benchmark.settings.algorithm_id, download_urls)?, + settings: benchmark.settings.clone(), + solution_signature_threshold: u32::MAX, // is fine unless the player has committed fraud + sampled_nonces: Some(sampled_nonces), + wasm_vm_config: WasmVMConfig { + max_memory: u64::MAX, + max_fuel: u64::MAX, + }, + })); + } + } + Ok(None) +} + +async fn pick_settings_to_benchmark() -> Result { + let State { + query_data, + selected_algorithms, + .. + } = &(*state().lock().await); + let QueryData { + player_data, + challenges, + download_urls, + .. + } = query_data; + let mut rng = StdRng::seed_from_u64(time() as u64); + let challenge = pick_challenge(&mut rng, player_data, challenges, selected_algorithms)?; + let selected_algorithm_id = selected_algorithms[&challenge.id].clone(); + let difficulty = pick_difficulty(&mut rng, challenge)?; + Ok(Job { + benchmark_id: Alphanumeric.sample_string(&mut rng, 32), + download_url: get_download_url(&selected_algorithm_id, download_urls)?, + settings: BenchmarkSettings { + player_id: player_id().clone(), + block_id: query_data.latest_block.id.clone(), + challenge_id: challenge.id.clone(), + algorithm_id: selected_algorithm_id, + difficulty, + }, + solution_signature_threshold: *challenge.block_data().solution_signature_threshold(), + sampled_nonces: None, + wasm_vm_config: query_data.latest_block.config().wasm_vm.clone(), + }) +} + +fn pick_challenge<'a>( + rng: &mut StdRng, + player_data: &'a Option, + challenges: &'a Vec, + selected_algorithms: &HashMap, +) -> Result<&'a Challenge> { + let num_qualifiers_by_challenge = match &player_data { + Some(data) => data + .num_qualifiers_by_challenge + .clone() + .ok_or_else(|| format!("Expecting num_qualifiers_by_challenge"))?, + None => HashMap::new(), + }; + let percent_qualifiers_by_challenge: HashMap = challenges + .iter() + .map(|c| { + let player_num_qualifiers = *num_qualifiers_by_challenge.get(&c.id).unwrap_or(&0); + let challenge_num_qualifiers = *c.block_data().num_qualifiers(); + let percent = if player_num_qualifiers == 0 || challenge_num_qualifiers == 0 { + 0f64 + } else { + (player_num_qualifiers as f64) / (challenge_num_qualifiers as f64) + }; + (c.id.clone(), percent) + }) + .collect(); + let challenge_weights: Vec<(String, f64)> = selected_algorithms + .keys() + .map(|challenge_id| { + ( + challenge_id.clone(), + 1f64 - percent_qualifiers_by_challenge[challenge_id] + 1e-10f64, + ) + }) + .collect(); + let dist = WeightedIndex::new( + &challenge_weights + .iter() + .map(|w| w.1.clone()) + .collect::>(), + ) + .map_err(|e| format!("Failed to create WeightedIndex: {}", e))?; + let index = dist.sample(rng); + let random_challenge_id = challenge_weights[index].0.clone(); + let challenge = challenges + .iter() + .find(|c| c.id == *random_challenge_id) + .ok_or_else(|| "Selected challenge should exist")?; + Ok(challenge) +} + +fn pick_difficulty(rng: &mut StdRng, challenge: &Challenge) -> Result> { + let min_difficulty = challenge.details.min_difficulty(); + let max_difficulty = challenge.details.max_difficulty(); + let block_data = challenge.block_data(); + let random_difficulty = block_data.base_frontier().sample(rng).scale( + &min_difficulty, + &max_difficulty, + *block_data.scaling_factor(), + ); + Ok(random_difficulty) +} + +fn get_download_url( + algorithm_id: &String, + download_urls: &HashMap, +) -> Result { + Ok(download_urls + .get(algorithm_id) + .ok_or_else(|| format!("Algorithm {} does not have wasm download_url", algorithm_id))? + .clone()) +} diff --git a/tig-benchmarker/src/benchmarker/submit_benchmark.rs b/tig-benchmarker/src/benchmarker/submit_benchmark.rs new file mode 100644 index 00000000..750fbddf --- /dev/null +++ b/tig-benchmarker/src/benchmarker/submit_benchmark.rs @@ -0,0 +1,29 @@ +use super::{api, state, Job, QueryData, Result}; +use tig_api::SubmitBenchmarkReq; + +pub async fn execute(job: &Job) -> Result { + let QueryData { + proofs, benchmarks, .. + } = &mut state().lock().await.query_data; + let benchmark = benchmarks + .get_mut(&job.benchmark_id) + .ok_or_else(|| format!("Job benchmark should exist"))?; + let proof = proofs + .get(&job.benchmark_id) + .ok_or_else(|| format!("Job proof should exist"))?; + let settings = benchmark.settings.clone(); + let solutions_meta_data = benchmark.solutions_meta_data.take().unwrap(); + let solution_data = proof.solutions_data().first().unwrap().clone(); + let resp = api() + .submit_benchmark(SubmitBenchmarkReq { + settings, + solutions_meta_data, + solution_data, + }) + .await + .map_err(|e| format!("Failed to submit benchmark: {:?}", e))?; + match resp.verified { + Ok(_) => Ok(resp.benchmark_id), + Err(e) => Err(format!("Benchmark flagged as fraud: {}", e)), + } +} diff --git a/tig-benchmarker/src/benchmarker/submit_proof.rs b/tig-benchmarker/src/benchmarker/submit_proof.rs new file mode 100644 index 00000000..2f189505 --- /dev/null +++ b/tig-benchmarker/src/benchmarker/submit_proof.rs @@ -0,0 +1,17 @@ +use super::{api, Result}; +use tig_api::SubmitProofReq; +use tig_worker::SolutionData; + +pub async fn execute(benchmark_id: String, solutions_data: Vec) -> Result<()> { + let resp = api() + .submit_proof(SubmitProofReq { + benchmark_id, + solutions_data, + }) + .await + .map_err(|e| format!("Failed to submit proof: {:?}", e))?; + match resp.verified { + Ok(_) => Ok(()), + Err(e) => Err(format!("Proof flagged as fraud: {}", e)), + } +} diff --git a/tig-benchmarker/src/future_utils.rs b/tig-benchmarker/src/future_utils.rs index f69cfcb8..c2cdccd1 100644 --- a/tig-benchmarker/src/future_utils.rs +++ b/tig-benchmarker/src/future_utils.rs @@ -1,65 +1,113 @@ +#[cfg(all(feature = "standalone", feature = "browser"))] +compile_error!("features `standalone` and `browser` are mutually exclusive"); + use serde::{de::DeserializeOwned, Serialize}; use std::future::Future; -pub use futures::lock::Mutex; -use gloo_timers::future::TimeoutFuture; -use js_sys::{Array, Date, Promise}; -use serde_wasm_bindgen::{from_value, to_value}; -use wasm_bindgen::prelude::*; -use wasm_bindgen_futures::future_to_promise; -use wasm_bindgen_futures::JsFuture; +#[cfg(feature = "standalone")] +mod utils { + use super::*; + use std::time::{SystemTime, UNIX_EPOCH}; + pub use tokio::sync::Mutex; + use tokio::{join, time}; -fn to_string(e: T) -> String { - format!("{:?}", e) -} -pub async fn join( - a: impl Future + 'static, - b: impl Future + 'static, - c: impl Future + 'static, - d: impl Future + 'static, -) -> Result<(T, U, V, W), String> -where - T: Serialize + DeserializeOwned + 'static, - U: Serialize + DeserializeOwned + 'static, - V: Serialize + DeserializeOwned + 'static, - W: Serialize + DeserializeOwned + 'static, -{ - let a = future_to_promise(async move { Ok(to_value(&a.await)?) }); - let b = future_to_promise(async move { Ok(to_value(&b.await)?) }); - let c = future_to_promise(async move { Ok(to_value(&c.await)?) }); - let d = future_to_promise(async move { Ok(to_value(&d.await)?) }); - - let promises = Array::new(); - promises.push(&a); - promises.push(&b); - promises.push(&c); - promises.push(&d); - - let js_promise = Promise::all(&promises); - let js_values = JsFuture::from(js_promise).await.map_err(to_string)?; - - let values = js_values.dyn_into::().map_err(to_string)?; - let results = ( - from_value(values.get(0)).map_err(to_string)?, - from_value(values.get(1)).map_err(to_string)?, - from_value(values.get(2)).map_err(to_string)?, - from_value(values.get(3)).map_err(to_string)?, - ); - - Ok(results) -} + pub async fn join( + a: impl Future + 'static, + b: impl Future + 'static, + c: impl Future + 'static, + d: impl Future + 'static, + ) -> Result<(T, U, V, W), String> + where + T: Serialize + DeserializeOwned + 'static, + U: Serialize + DeserializeOwned + 'static, + V: Serialize + DeserializeOwned + 'static, + W: Serialize + DeserializeOwned + 'static, + { + Ok(join!(a, b, c, d)) + } -pub fn spawn(f: impl Future + 'static) { - let _ = future_to_promise(async move { - f.await; - Ok(JsValue::undefined()) - }); -} + pub fn spawn(f: impl Future + 'static + Send) { + tokio::spawn(f); + } -pub async fn sleep(ms: u32) { - TimeoutFuture::new(ms).await; + pub async fn sleep(ms: u32) { + time::sleep(time::Duration::from_millis(ms as u64)).await; + } + + pub fn time() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 + } } -pub fn time() -> u64 { - Date::now() as u64 +#[cfg(feature = "browser")] +mod utils { + use super::*; + pub use futures::lock::Mutex; + use gloo_timers::future::TimeoutFuture; + use js_sys::{Array, Date, Promise}; + use serde_wasm_bindgen::{from_value, to_value}; + use wasm_bindgen::prelude::*; + use wasm_bindgen_futures::future_to_promise; + use wasm_bindgen_futures::JsFuture; + + fn to_string(e: T) -> String { + format!("{:?}", e) + } + pub async fn join( + a: impl Future + 'static, + b: impl Future + 'static, + c: impl Future + 'static, + d: impl Future + 'static, + ) -> Result<(T, U, V, W), String> + where + T: Serialize + DeserializeOwned + 'static, + U: Serialize + DeserializeOwned + 'static, + V: Serialize + DeserializeOwned + 'static, + W: Serialize + DeserializeOwned + 'static, + { + let a = future_to_promise(async move { Ok(to_value(&a.await)?) }); + let b = future_to_promise(async move { Ok(to_value(&b.await)?) }); + let c = future_to_promise(async move { Ok(to_value(&c.await)?) }); + let d = future_to_promise(async move { Ok(to_value(&d.await)?) }); + + let promises = Array::new(); + promises.push(&a); + promises.push(&b); + promises.push(&c); + promises.push(&d); + + let js_promise = Promise::all(&promises); + let js_values = JsFuture::from(js_promise).await.map_err(to_string)?; + + let values = js_values.dyn_into::().map_err(to_string)?; + let results = ( + from_value(values.get(0)).map_err(to_string)?, + from_value(values.get(1)).map_err(to_string)?, + from_value(values.get(2)).map_err(to_string)?, + from_value(values.get(3)).map_err(to_string)?, + ); + + Ok(results) + } + + pub fn spawn(f: impl Future + 'static) { + // Convert the Rust Future into a JavaScript Promise + let _ = future_to_promise(async move { + f.await; + Ok(JsValue::undefined()) + }); + } + + pub async fn sleep(ms: u32) { + TimeoutFuture::new(ms).await; + } + + pub fn time() -> u64 { + Date::now() as u64 + } } + +pub use utils::*; diff --git a/tig-benchmarker/src/lib.rs b/tig-benchmarker/src/lib.rs index 237b9aeb..e0e2aaa5 100644 --- a/tig-benchmarker/src/lib.rs +++ b/tig-benchmarker/src/lib.rs @@ -8,13 +8,13 @@ mod exports { #[wasm_bindgen] pub async fn state() -> JsValue { - let state = benchmarker::mutex().lock().await.clone(); + let state = benchmarker::state().lock().await.clone(); serde_wasm_bindgen::to_value(&state).unwrap() } #[wasm_bindgen] - pub async fn start() { - benchmarker::start().await; + pub async fn start(num_workers: u32) { + benchmarker::start(num_workers).await; } #[wasm_bindgen] @@ -28,7 +28,7 @@ mod exports { } #[wasm_bindgen] - pub async fn setup(api_url: String, api_key: String, player_id: String, num_workers: u32) { - benchmarker::setup(api_url, api_key, player_id.to_string(), num_workers).await; + pub async fn setup(api_url: String, api_key: String, player_id: String) { + benchmarker::setup(api_url, api_key, player_id.to_string()).await; } }