Skip to content

Commit

Permalink
impl almost done, needs extra error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Dec 19, 2024
1 parent 11437e5 commit 584c069
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{collections::HashMap, convert::TryFrom, fs, sync::Arc};
use std::{
collections::HashMap,
convert::TryFrom,
fs,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};

use chrono::Utc;
use chrono::{Duration, Utc};
use log::*;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_crypto::tari_utilities::ByteArray;
Expand All @@ -48,7 +54,6 @@ use tari_template_builtin::{
FAUCET_TEMPLATE_ADDRESS,
};
use tari_template_lib::{models::TemplateAddress, Hash};
use tokio::sync::broadcast;

use super::TemplateConfig;
use crate::template_manager::{
Expand All @@ -60,6 +65,12 @@ const LOG_TARGET: &str = "tari::validator_node::template_manager";

const CONCURRENT_ACCESS_LIMIT: isize = 100;

#[derive(Debug, Clone)]
pub enum TemplateResult {
Template(Box<Template>),
PendingTemplate,
}

#[derive(Debug)]
pub struct TemplateManager<TAddr> {
global_db: GlobalDb<SqliteGlobalDbAdapter<TAddr>>,
Expand Down Expand Up @@ -132,21 +143,25 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
}
}

pub fn template_exists(&self, address: &TemplateAddress) -> Result<bool, TemplateManagerError> {
pub fn template_exists(
&self,
address: &TemplateAddress,
status: TemplateStatus,
) -> Result<bool, TemplateManagerError> {
if self.builtin_templates.contains_key(address) {
return Ok(true);
}
let mut tx = self.global_db.create_transaction()?;
self.global_db
.templates(&mut tx)
.template_exists(address)
.template_exists(address, status)
.map_err(|_| TemplateManagerError::TemplateNotFound { address: *address })
}

pub fn fetch_template(&self, address: &TemplateAddress) -> Result<Template, TemplateManagerError> {
pub fn fetch_template(&self, address: &TemplateAddress) -> Result<TemplateResult, TemplateManagerError> {
// first of all, check if the address is for a bulitin template
if let Some(template) = self.builtin_templates.get(address) {
return Ok(template.to_owned());
return Ok(TemplateResult::Template(Box::new(template.to_owned())));
}

let mut tx = self.global_db.create_transaction()?;
Expand All @@ -156,6 +171,11 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
.get_template(address)?
.ok_or(TemplateManagerError::TemplateNotFound { address: *address })?;

// notify the caller that the template is under sync, so not yet ready
if matches!(template.status, TemplateStatus::Pending) {
return Ok(TemplateResult::PendingTemplate);
}

if !matches!(template.status, TemplateStatus::Active | TemplateStatus::Deprecated) {
return Err(TemplateManagerError::TemplateUnavailable);
}
Expand All @@ -167,16 +187,16 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
TemplateExecutable::CompiledWasm(wasm) => {
let binary = fs::read(dbg_replacement).expect("Could not read debug file");
*wasm = binary;
}
},
TemplateExecutable::Flow(_) => {
todo!("debug replacements for flow templates not implemented");
}
},
_ => return Err(TemplateManagerError::TemplateUnavailable),
}

Ok(result)
Ok(TemplateResult::Template(Box::new(result)))
} else {
Ok(template.into())
Ok(TemplateResult::Template(Box::new(template.into())))
}
}

Expand All @@ -201,7 +221,10 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
let mut tx = self.global_db.create_transaction()?;
let mut templates_db = self.global_db.templates(&mut tx);
match templates_db.get_template(&template.template_address)? {
Some(_) => templates_db.update_template(&template.template_address, DbTemplateUpdate::status(TemplateStatus::Pending))?,
Some(_) => templates_db.update_template(
&template.template_address,
DbTemplateUpdate::status(TemplateStatus::Pending),
)?,
None => templates_db.insert_template(template)?,
}

Expand Down Expand Up @@ -236,22 +259,22 @@ impl<TAddr: NodeAddressable> TemplateManager<TAddr> {
template_hash = TemplateHash::Hash(template_hasher32().chain(binary.as_slice()).result());
compiled_code = Some(binary);
template_name = loaded_template.template_name().to_string();
}
},
TemplateExecutable::Manifest(curr_manifest) => {
template_hash = TemplateHash::Hash(template_hasher32().chain(curr_manifest.as_str()).result());
manifest = Some(curr_manifest);
template_type = DbTemplateType::Manifest;
}
},
TemplateExecutable::Flow(curr_flow_json) => {
template_hash = TemplateHash::Hash(template_hasher32().chain(curr_flow_json.as_str()).result());
flow_json = Some(curr_flow_json);
template_type = DbTemplateType::Flow;
}
},
TemplateExecutable::DownloadableWasm(url, hash) => {
template_url = Some(url.to_string());
template_type = DbTemplateType::Wasm;
template_hash = TemplateHash::FixedHash(hash);
}
},
}

let template = DbTemplate {
Expand Down Expand Up @@ -326,25 +349,47 @@ impl<TAddr: NodeAddressable + Send + Sync + 'static> TemplateProvider for Templa
return Ok(Some(template));
}

let Some(template) = self.fetch_template(address).optional()? else {
let Some(template_result) = self.fetch_template(address).optional()? else {
return Ok(None);
};

debug!(target: LOG_TARGET, "CACHE MISS: Template {}", address);

// getting template
let template = match template_result {
TemplateResult::Template(template) => template,
TemplateResult::PendingTemplate => {
let start = SystemTime::now();
let mut template = None;
while template.is_none() {
let elapsed = start.duration_since(UNIX_EPOCH).expect("Time went backwards");
if elapsed.gt(&self.config.pending_templates_wait_timeout()) {
break;
}
if let Some(TemplateResult::Template(fetched_template)) = self.fetch_template(address).optional()? {
template = Some(fetched_template);
}
}
debug!(target: LOG_TARGET, "Failed to fetch template {} within {:?}", address, self.config.pending_templates_wait_timeout());
template.ok_or(Self::Error::TemplateUnavailable)?
},
};

let loaded = match template.executable {
TemplateExecutable::CompiledWasm(wasm) => {
let module = WasmModule::from_code(wasm);
module.load_template()?
}
},
TemplateExecutable::Manifest(_) => return Err(TemplateManagerError::UnsupportedTemplateType),
TemplateExecutable::Flow(flow_json) => {
let definition: FlowFunctionDefinition = serde_json::from_str(&flow_json)?;
let factory = FlowFactory::try_create::<Self>(definition)?;
LoadedTemplate::Flow(factory)
}
},
TemplateExecutable::DownloadableWasm(_, _) => {
// impossible case, since there is no separate downloadable wasm type in DB level
return Err(Self::Error::UnsupportedTemplateType);
}
},
};

self.cache.insert(*address, loaded.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ mod initializer;
pub use initializer::spawn;

mod manager;
pub use manager::TemplateManager;
pub use manager::{TemplateManager, TemplateResult};

mod service;

mod cmap_semaphore;
Expand Down
Loading

0 comments on commit 584c069

Please sign in to comment.