Skip to content

Commit

Permalink
feat(consensus): sequence transaction from foreign LocalPrepare/Accept
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 22, 2024
1 parent 7f0a322 commit 2acb91d
Show file tree
Hide file tree
Showing 71 changed files with 1,802 additions and 1,032 deletions.
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 0 additions & 10 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
self.network,
);
let result = processor.execute(transaction.clone())?;
// Ok(result) => result,
// // TODO: This may occur due to an internal error (e.g. OOM, etc).
// Err(err) => ExecuteResult {
// finalize: FinalizeResult::new_rejected(
// tx_id,
// RejectReason::ExecutionFailure(format!("BUG: {err}")),
// ),
// execution_time: Duration::default(),
// },
// };

Ok(ExecutionOutput { transaction, result })
}
Expand Down
4 changes: 3 additions & 1 deletion applications/tari_indexer/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use axum_jrpc::{
JsonRpcResponse,
};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use log::warn;
use log::{info, warn};
use serde_json::{self as json, json, Value};
use tari_base_node_client::{grpc::GrpcBaseNodeClient, types::BaseLayerConsensusConstants, BaseNodeClient};
use tari_crypto::tari_utilities::hex::to_hex;
Expand Down Expand Up @@ -535,6 +535,8 @@ impl JsonRpcHandlers {
e => Self::internal_error(answer_id, e),
})?;

info!(target: LOG_TARGET, "✅ Transaction submitted: {}", transaction_id);

Ok(JsonRpcResponse::success(answer_id, SubmitTransactionResponse {
result: IndexerTransactionFinalizedResult::Pending,
transaction_id,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_swarm_daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ humantime = { workspace = true }
include_dir = { workspace = true }
json5 = { workspace = true }
lockfile = "0.4.0"
slug = "0.1.6"
log = { workspace = true }
mime_guess = { workspace = true }
reqwest = { workspace = true, features = ["json"] }
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct Overrides {
pub binaries_root: Option<PathBuf>,
#[clap(long)]
pub start_port: Option<u16>,
#[clap(short = 'k', long)]
pub skip_registration: bool,
}

impl Overrides {
Expand Down
7 changes: 6 additions & 1 deletion applications/tari_swarm_daemon/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
};

use crate::cli::Cli;
use crate::cli::{Cli, Commands};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Config {
Expand All @@ -26,6 +26,8 @@ pub struct Config {
pub webserver: WebserverConfig,
#[serde(flatten)]
pub processes: ProcessesConfig,
#[serde(default)]
pub skip_registration: bool,
}

impl Config {
Expand Down Expand Up @@ -54,6 +56,9 @@ impl Config {
}

fn overrides_from_cli(&mut self, cli: &Cli) {
if let Commands::Start(ref overrides) = cli.command {
self.skip_registration = overrides.skip_registration;
}
if let Some(ref base_dir) = cli.common.base_dir {
self.base_dir.clone_from(base_dir);
}
Expand Down
6 changes: 4 additions & 2 deletions applications/tari_swarm_daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,10 @@ fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
.ok()
.and_then(|p| p.parent().map(|p| p.to_path_buf()))
})
.unwrap_or_else(|| std::env::current_dir().unwrap());
.unwrap_or_else(|| std::env::current_dir().unwrap().join("data").join("swarm"));

Ok(Config {
skip_registration: false,
network: cli.common.network.unwrap_or(Network::LocalNet),
start_port: 12000,
base_dir: base_dir
Expand Down Expand Up @@ -204,14 +205,15 @@ async fn start(cli: &Cli) -> anyhow::Result<()> {

create_paths(&config).await?;

let shutdown = Shutdown::new();
let mut shutdown = Shutdown::new();
let signal = shutdown.to_signal().select(exit_signal()?);
let (task_handle, pm_handle) = process_manager::spawn(&config, shutdown.to_signal());
let webserver = webserver::spawn(config, shutdown.to_signal(), pm_handle.clone());

tokio::select! {
_ = signal => {
log::info!("Terminating all instances...");
shutdown.trigger();
let num_instances = pm_handle.stop_all().await?;
log::info!("Terminated {num_instances} instances");
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{

use anyhow::{anyhow, Context};
use log::info;
use slug::slugify;
use tari_common::configuration::Network;
use tokio::{
fs,
Expand Down Expand Up @@ -83,7 +84,7 @@ impl InstanceManager {
self.fork_new(
executable,
instance.instance_type,
format!("{}-#{}", instance.name, i),
format!("{}-#{:02}", instance.name, i),
instance.settings.clone(),
)
.await?;
Expand Down Expand Up @@ -131,10 +132,7 @@ impl InstanceManager {

let mut allocated_ports = ports.unwrap_or_else(|| self.port_allocator.create());

let base_path = self
.base_path
.join("processes")
.join(format!("{instance_id}-{instance_type}"));
let base_path = self.base_path.join("processes").join(slugify(&instance_name));
fs::create_dir_all(&base_path).await?;

let context = ProcessContext::new(
Expand Down
42 changes: 31 additions & 11 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ pub struct ProcessManager {
instance_manager: InstanceManager,
rx_request: mpsc::Receiver<ProcessManagerRequest>,
shutdown_signal: ShutdownSignal,
skip_registration: bool,
}

impl ProcessManager {
pub fn new(config: &Config, shutdown_signal: ShutdownSignal) -> (Self, ProcessManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
skip_registration: config.skip_registration,
executable_manager: ExecutableManager::new(
config.processes.executables.clone(),
config.processes.force_compile,
Expand All @@ -49,21 +51,39 @@ impl ProcessManager {
(this, ProcessManagerHandle::new(tx_request))
}

pub async fn start(mut self) -> anyhow::Result<()> {
async fn setup(&mut self) -> anyhow::Result<()> {
info!("Starting process manager");
let executables = self.executable_manager.prepare_all().await?;
self.instance_manager.fork_all(executables).await?;

let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;
if !self.skip_registration {
let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
}

Ok(())
}

self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;
pub async fn start(mut self) -> anyhow::Result<()> {
let mut shutdown_signal = self.shutdown_signal.clone();

tokio::select! {
result = self.setup() => {
result?;
},
_ = shutdown_signal.wait() => {
info!("Shutting down process manager");
return Ok(());
}
}

loop {
tokio::select! {
Expand Down Expand Up @@ -228,7 +248,7 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(10).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct ListValidatorNodesResponse {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidatorNodeInfo {
pub instance_id: InstanceId,
pub name: String,
pub web: String,
pub jrpc: String,
Expand All @@ -42,6 +43,7 @@ pub async fn list(
let jrpc = format!("http://localhost:{json_rpc_port}");

Ok(ValidatorNodeInfo {
instance_id: instance.id,
name: instance.name,
web,
jrpc,
Expand Down
39 changes: 24 additions & 15 deletions applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,15 @@ async function jsonRpc2(address: string, method: string, params: any = null) {
return json.result;
}

function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horizontal }: {
function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }: {
name: string,
url: string,
setRow: any,
addTxToPool: any,
autoRefresh: boolean,
state: any,
horizontal: boolean
}) {
const [bucket, setBucket] = useState(null);
const [committeeInfo, setCommitteeInfo] = useState<any>(null);
const [epoch, setEpoch] = useState(null);
const [height, setHeight] = useState(null);
const [pool, setPool] = useState([]);
Expand All @@ -65,8 +64,8 @@ function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horiz
}, [tick, autoRefresh]);
useEffect(() => {
jsonRpc2(url, "get_epoch_manager_stats").then((resp) => {
setRow(resp.committee_shard.shard + 1);
setBucket(resp.committee_shard.shard);
// setRow(resp.committee_info.shard + 1);
setCommitteeInfo(resp.committee_info);
setHeight(resp.current_block_height);
setEpoch(resp.current_epoch);
}).catch((resp) => {
Expand Down Expand Up @@ -202,12 +201,12 @@ function ExtraInfoVN({ name, url, setRow, addTxToPool, autoRefresh, state, horiz
gridTemplateColumns: "auto auto",
gridTemplateRows: "auto auto auto auto auto",
}}>
<div><b>Bucket</b></div>
<div><b>Shard Group</b></div>
<div><b>Height</b></div>
<div><b>Epoch</b></div>
<div><b>Public key</b></div>
<div><b>Peer id</b></div>
<div>{bucket}</div>
<div>{committeeInfo?.shard_group.start}-{committeeInfo?.shard_group.end_inclusive} ({committeeInfo?.num_shard_group_members} members)</div>
<div>{height}</div>
<div>{epoch}</div>
<div>{publicKey}</div>
Expand All @@ -234,7 +233,6 @@ function ShowInfo(params: any) {
horizontal,
onReload,
} = params;
const [row, setRow] = useState(1);
// const [unprocessedTx, setUnprocessedTx] = useState([]);
const nameInfo = name && (
<div>
Expand Down Expand Up @@ -304,16 +302,15 @@ function ShowInfo(params: any) {


return (
<div className="info" key={name} style={{ gridRow: row }}>
<div className="info" key={name}>
{nameInfo}
{httpInfo}
{jrpcInfo}
{grpcInfo}
{showLogs && logInfo}
{executable === Executable.ValidatorNode && node?.jrpc &&
<ExtraInfoVN name={name} url={node.jrpc} setRow={(new_row: any) => {
if (new_row != row) setRow(new_row);
}} addTxToPool={addTxToPool} autoRefresh={autoRefresh} state={state} horizontal={horizontal} />}
<ExtraInfoVN name={name} url={node.jrpc} addTxToPool={addTxToPool} autoRefresh={autoRefresh} state={state}
horizontal={horizontal} />}
{executable !== Executable.Templates &&
<NodeControls
isRunning={node?.is_running || false}
Expand Down Expand Up @@ -364,11 +361,23 @@ function ShowInfos(params: any) {
setState((state) => ({ ...state, [partial_state.name]: partial_state.state }));
}
};

const sortedNodes = Object.keys(nodes).map((key) => [key, nodes[key]]);
sortedNodes.sort((a, b) => {
if (a[1].instance_id > b[1].instance_id) {
return 1;
}
if (a[1].instance_id < b[1].instance_id) {
return -1;
}
return 0;
});

return (
<div className="infos" style={{ display: "grid" }}>
{Object.keys(nodes).map((index) =>
<ShowInfo key={index} executable={executable} name={nodes[index].name} node={nodes[index]}
logs={logs?.[`${name} ${index}`]} stdoutLogs={stdoutLogs?.[`${name} ${index}`]}
{sortedNodes.map(([key, node]) =>
<ShowInfo key={key} executable={executable} name={node.name} node={node}
logs={logs?.[`${name} ${key}`]} stdoutLogs={stdoutLogs?.[`${name} ${key}`]}
showLogs={showLogs}
autoRefresh={autoRefresh} updateState={updateState} state={state} horizontal={horizontal}
onReload={onReload} />)}
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_swarm_daemon/webui/src/utils/json_rpc.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ export async function jsonRpc(method: string, args: any = {}) {

json_id += 1;

const address = import.meta.env.VITE_DAEMON_JRPC_ADDRESS || "";
const address = import.meta.env.VITE_JSON_RPC_ADDRESS || import.meta.env.VITE_JRPC_ADDRESS || "/json_rpc";
const headers: { [key: string]: string } = { "Content-Type": "application/json" };
const response = await fetch(`${address}/json_rpc`, {
const response = await fetch(address, {
method: "POST",
body: JSON.stringify({
method: method,
Expand Down
3 changes: 1 addition & 2 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ pub async fn spawn_services(
handles.push(join_handle);

info!(target: LOG_TARGET, "Message logging initializing");
// Spawn messaging
let message_logger = SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite"));

info!(target: LOG_TARGET, "State store initializing");
// Connect to shard db
Expand Down Expand Up @@ -230,6 +228,7 @@ pub async fn spawn_services(
};

// Messaging
let message_logger = SqliteMessageLogger::new(config.validator_node.data_dir.join("message_log.sqlite"));
let local_address = PeerAddress::from(keypair.public_key().clone());
let (loopback_sender, loopback_receiver) = mpsc::unbounded_channel();
let inbound_messaging = ConsensusInboundMessaging::new(
Expand Down
Loading

0 comments on commit 2acb91d

Please sign in to comment.