Skip to content

Commit

Permalink
refactor: docker client to return list of node-instance-info
Browse files Browse the repository at this point in the history
  • Loading branch information
bochaco committed Jan 22, 2025
1 parent b2a0e68 commit 3bfcb87
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 95 deletions.
75 changes: 26 additions & 49 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ use super::{
stats::AggregatedStatsView,
};

#[cfg(feature = "ssr")]
use super::docker_msgs::Container;
#[cfg(feature = "ssr")]
use axum::extract::FromRef;
#[cfg(feature = "hydrate")]
Expand Down Expand Up @@ -105,8 +103,8 @@ const HOME_NETWORK_ONLY: &str = "HOME_NETWORK_ONLY";
#[derive(Clone, Debug)]
pub enum BgTasksCmds {
ApplySettings(AppSettings),
CheckBalanceFor(Container),
DeleteBalanceFor(Container),
CheckBalanceFor(NodeInstanceInfo),
DeleteBalanceFor(NodeInstanceInfo),
CheckAllBalances,
}

Expand Down
43 changes: 18 additions & 25 deletions src/bg_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use super::{
app::{AppSettings, BgTasksCmds, ImmutableNodeStatus, Stats, METRICS_MAX_SIZE_PER_CONTAINER},
db_client::DbClient,
docker_client::DockerClient,
docker_msgs::Container,
lcd::display_stats_on_lcd,
metrics_client::{NodeMetricsClient, NodesMetrics},
node_instance::NodeInstanceInfo,
Expand Down Expand Up @@ -233,13 +232,12 @@ pub fn spawn_bg_tasks(

*server_api_hit.lock().await = false;
match docker_client.get_containers_list(true).await {
Ok(containers) => {
let total_nodes = containers.len();
Ok(nodes) => {
let total_nodes = nodes.len();
let mut num_active_nodes = 0;
let mut num_inactive_nodes = 0;

for container in containers.into_iter() {
let node_info: NodeInstanceInfo = container.into();
for node_info in nodes.into_iter() {
update_node_metadata(&node_info, &db_client, &node_status_locked).await;
if node_info.status.is_active() {
num_active_nodes += 1;
Expand Down Expand Up @@ -375,15 +373,15 @@ async fn update_nodes_info(
lcd_stats: &Arc<Mutex<HashMap<String, String>>>,
global_stats: Arc<Mutex<Stats>>,
) {
let containers = docker_client
let nodes = docker_client
.get_containers_list(true)
.await
.unwrap_or_else(|err| {
logging::log!("Failed to get containers list: {err}");
logging::log!("Failed to get nodes list: {err}");
vec![]
});

let num_nodes = containers.len();
let num_nodes = nodes.len();
if num_nodes > 0 {
logging::log!("Fetching status and metrics from {num_nodes} node/s ...");
}
Expand All @@ -398,9 +396,7 @@ async fn update_nodes_info(
let mut shunned_count = 0;
let mut bin_version = HashSet::<String>::new();

for container in containers.into_iter() {
let mut node_info: NodeInstanceInfo = container.into();

for mut node_info in nodes.into_iter() {
if node_info.status.is_active() {
num_active_nodes += 1;

Expand Down Expand Up @@ -490,17 +486,16 @@ async fn update_nodes_info(

// Prune metrics records from the cache DB to always keep the number of records within a limit.
async fn prune_metrics(docker_client: DockerClient, db_client: DbClient) {
let containers = match docker_client.get_containers_list(false).await {
Ok(containers) if !containers.is_empty() => containers,
let nodes = match docker_client.get_containers_list(false).await {
Ok(nodes) if !nodes.is_empty() => nodes,
Err(err) => {
logging::log!("Failed to get containers list: {err}");
logging::log!("Failed to get nodes list: {err}");
return;
}
_ => return,
};

for container in containers.into_iter() {
let node_info: NodeInstanceInfo = container.into();
for node_info in nodes.into_iter() {
logging::log!(
"Removing oldest metrics from DB for node {} ...",
node_info.short_container_id()
Expand Down Expand Up @@ -578,10 +573,10 @@ async fn balance_checker_task(
let _ = bg_tasks_cmds_tx.send(BgTasksCmds::CheckAllBalances);
}
}
Ok(BgTasksCmds::CheckBalanceFor(container)) => {
Ok(BgTasksCmds::CheckBalanceFor(node_info)) => {
if let Some(ref token_contract) = token_contract {
retrieve_current_balances(
[container],
[node_info],
token_contract,
&db_client,
&mut updated_balances,
Expand All @@ -597,8 +592,7 @@ async fn balance_checker_task(
global_stats.lock().await.total_balance = total_balance;
}
}
Ok(BgTasksCmds::DeleteBalanceFor(container)) => {
let node_info: NodeInstanceInfo = container.into();
Ok(BgTasksCmds::DeleteBalanceFor(node_info)) => {
if let Some(Ok(address)) = node_info
.rewards_addr
.as_ref()
Expand All @@ -619,9 +613,9 @@ async fn balance_checker_task(
let mut total_balance = U256::from(0u64);
if let Some(ref token_contract) = token_contract {
match docker_client.get_containers_list(true).await {
Ok(containers) if !containers.is_empty() => {
Ok(nodes) if !nodes.is_empty() => {
retrieve_current_balances(
containers,
nodes,
token_contract,
&db_client,
&mut updated_balances,
Expand Down Expand Up @@ -653,13 +647,12 @@ async fn balance_checker_task(
}

async fn retrieve_current_balances<T: Transport + Clone, P: Provider<T, N>, N: Network>(
containers: impl IntoIterator<Item = Container>,
nodes: impl IntoIterator<Item = NodeInstanceInfo>,
token_contract: &TokenContract::TokenContractInstance<T, P, N>,
db_client: &DbClient,
updated_balances: &mut HashMap<Address, U256>,
) {
for container in containers.into_iter() {
let node_info: NodeInstanceInfo = container.into();
for node_info in nodes.into_iter() {
let node_short_id = node_info.short_container_id();
if let Some(Ok(address)) = node_info
.rewards_addr
Expand Down
14 changes: 9 additions & 5 deletions src/docker_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{docker_msgs::*, node_instance::ContainerId};
use super::{
docker_msgs::*,
node_instance::{ContainerId, NodeInstanceInfo},
};

use axum::body::Body;
use bytes::Bytes;
Expand Down Expand Up @@ -210,7 +213,7 @@ impl DockerClient {
pub async fn get_container_info(
&self,
id: &ContainerId,
) -> Result<Container, DockerClientError> {
) -> Result<NodeInstanceInfo, DockerClientError> {
let mut filters = HashMap::default();
filters.insert("id".to_string(), vec![id.clone()]);
let containers = self.list_containers(&filters, true).await?;
Expand All @@ -225,7 +228,7 @@ impl DockerClient {
pub async fn get_containers_list(
&self,
all: bool,
) -> Result<Vec<Container>, DockerClientError> {
) -> Result<Vec<NodeInstanceInfo>, DockerClientError> {
let mut filters = HashMap::default();
filters.insert("label".to_string(), vec![LABEL_KEY_VERSION.to_string()]);
self.list_containers(&filters, all).await
Expand All @@ -236,7 +239,7 @@ impl DockerClient {
&self,
filters: &HashMap<String, Vec<String>>,
all: bool,
) -> Result<Vec<Container>, DockerClientError> {
) -> Result<Vec<NodeInstanceInfo>, DockerClientError> {
let url = format!("{DOCKER_CONTAINERS_API}/json");
let all_str = all.to_string();
let query = &[
Expand All @@ -245,7 +248,8 @@ impl DockerClient {
];
let resp_bytes = self.send_request(ReqMethod::Get, &url, query).await?;
let containers: Vec<Container> = serde_json::from_slice(&resp_bytes)?;
Ok(containers)
let nodes = containers.into_iter().map(|c| c.into()).collect();
Ok(nodes)
}

// Request the Docker server to DELETE a container matching the given id
Expand Down
Loading

0 comments on commit 3bfcb87

Please sign in to comment.