Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: review atoma daemon api docs and do not panic when connecting to proxy server (at spawn) #297

Merged
merged 4 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{path::Path, str::FromStr, sync::Arc};
use std::{
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
};

use anyhow::{Context, Result};
use atoma_confidential::AtomaConfidentialComputeService;
Expand All @@ -16,7 +20,7 @@ use dotenv::dotenv;
use futures::future::try_join_all;
use hf_hub::{api::sync::ApiBuilder, Repo, RepoType};
use sui_keys::keystore::FileBasedKeystore;
use sui_sdk::types::base_types::ObjectID;
use sui_sdk::{types::base_types::ObjectID, wallet_context::WalletContext};
use tokenizers::Tokenizer;
use tokio::{
net::TcpListener,
Expand Down Expand Up @@ -47,8 +51,8 @@ const DAEMON_LOG_FILE: &str = "atoma-daemon.log";
#[derive(Parser)]
struct Args {
/// Index of the address to use from the keystore
#[arg(short, long, default_value_t = 0)]
address_index: usize,
#[arg(short, long)]
address_index: Option<usize>,

/// Path to the configuration file
#[arg(short, long)]
Expand Down Expand Up @@ -184,6 +188,19 @@ async fn main() -> Result<()> {

let keystore = FileBasedKeystore::new(&config.sui.sui_keystore_path().into())
.context("Failed to initialize keystore")?;
let mut wallet_ctx = WalletContext::new(
&PathBuf::from(config.sui.sui_config_path()),
config.sui.request_timeout(),
config.sui.max_concurrent_requests(),
)?;
let address = wallet_ctx.active_address()?;
let address_index = args.address_index.unwrap_or(
wallet_ctx
.get_addresses()
.iter()
.position(|a| a == &address)
.unwrap(),
);

info!(
target = "atoma-node-service",
Expand Down Expand Up @@ -213,6 +230,19 @@ async fn main() -> Result<()> {
let (app_state_encryption_sender, _app_state_encryption_receiver) =
tokio::sync::mpsc::unbounded_channel();

for (_, node_small_id) in config.daemon.node_badges.iter() {
if let Err(e) =
register_on_proxy(&config.proxy, *node_small_id, &keystore, address_index).await
{
error!(
target = "atoma-node-service",
event = "register_on_proxy_error",
error = ?e,
"Failed to register on proxy server"
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we exit the application? this means the proxy doesn't know the address of the node, so it can't call the endpoints

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly don't know what's best. This overall approach is not good, we need a p2p network for public url sharing..

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, agreed

}
}

info!(
target = "atoma-node-service",
event = "confidential_compute_service_spawn",
Expand All @@ -223,10 +253,6 @@ async fn main() -> Result<()> {
AtomaSuiClient::new_from_config(args.config_path).await?,
));

for (_, node_small_id) in config.daemon.node_badges.iter() {
register_on_proxy(&config.proxy, *node_small_id, &keystore, args.address_index).await?;
}

let (compute_shared_secret_sender, _compute_shared_secret_receiver) =
tokio::sync::mpsc::unbounded_channel();

Expand Down Expand Up @@ -311,7 +337,7 @@ async fn main() -> Result<()> {
.image_generations_service_url
.context("Image generations service URL not configured")?,
keystore: Arc::new(keystore),
address_index: args.address_index,
address_index: address_index,
};

let daemon_app_state = DaemonState {
Expand Down
16 changes: 2 additions & 14 deletions atoma-daemon/docs/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,6 @@ components:
required:
- task_small_id
- price_per_one_million_compute_units
- max_num_compute_units
properties:
gas:
type: string
Expand All @@ -656,11 +655,6 @@ components:
Optional gas price.
If not provided, the default is `None`.
minimum: 0
max_num_compute_units:
type: integer
format: int64
description: The maximum number of compute units.
minimum: 0
node_badge_id:
type: string
description: |-
Expand All @@ -669,7 +663,7 @@ components:
price_per_one_million_compute_units:
type: integer
format: int64
description: The price per compute unit.
description: The price per one million compute units.
minimum: 0
task_small_id:
type: integer
Expand Down Expand Up @@ -752,7 +746,6 @@ components:
required:
- task_small_id
- price_per_one_million_compute_units
- max_num_compute_units
properties:
gas:
type: string
Expand All @@ -777,11 +770,6 @@ components:
Optional gas price.
If not provided, the default is `None`.
minimum: 0
max_num_compute_units:
type: integer
format: int64
description: The maximum number of compute units.
minimum: 0
node_badge_id:
type: string
description: |-
Expand All @@ -790,7 +778,7 @@ components:
price_per_one_million_compute_units:
type: integer
format: int64
description: The price per compute unit.
description: The price per one million compute units.
minimum: 0
task_small_id:
type: integer
Expand Down
4 changes: 2 additions & 2 deletions atoma-daemon/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub struct NodeTaskSubscriptionRequest {
#[schema(value_type = String)]
pub node_badge_id: Option<ObjectID>,

/// The price per compute unit.
/// The price per one million compute units.
pub price_per_one_million_compute_units: u64,

/// Optional gas object ID.
Expand Down Expand Up @@ -139,7 +139,7 @@ pub struct NodeTaskUpdateSubscriptionRequest {
#[schema(value_type = String)]
pub node_badge_id: Option<ObjectID>,

/// The price per compute unit.
/// The price per one million compute units.
pub price_per_one_million_compute_units: u64,

/// Optional gas object ID.
Expand Down
4 changes: 2 additions & 2 deletions config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ node_badges = [
] # List of node badges, where each badge is a tuple of (badge_id, small_id), both values are assigned once the node registers itself

[proxy_server]
# replace this with the address of the proxy server
# replace this with the public url address of the proxy server
proxy_address = ""
# replace this with the public address of this node
# replace this with the public url address of this node
node_public_address = ""
# replace this with the country of the node
country = ""