Skip to content

Commit

Permalink
checkpointing
Browse files Browse the repository at this point in the history
  • Loading branch information
nicarq committed Oct 16, 2023
1 parent 9534cba commit 235cfcb
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 65 deletions.
3 changes: 3 additions & 0 deletions files/proxy_identities_test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
identity1,127.0.0.1:8000,127.0.0.1:8001
identity2,127.0.0.1:8002,127.0.0.1:8003
identity3,127.0.0.1:8004,127.0.0.1:8005
3 changes: 2 additions & 1 deletion src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ pub mod db_toolkits;
pub mod db_utils;
pub mod db_retry;
pub mod db_files_transmission;
pub mod db_job_queue;
pub mod db_job_queue;
pub mod db_proxy;
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::utils::qr_code_setup::generate_qr_codes;
use async_channel::{bounded, Receiver, Sender};
use ed25519_dalek::{PublicKey as SignaturePublicKey, SecretKey as SignatureStaticKey};
use network::Node;
use network::node::NodeProxyMode;
use shinkai_message_primitives::shinkai_message::shinkai_message_schemas::{IdentityPermissions, RegistrationCodeType};
use shinkai_message_primitives::shinkai_utils::encryption::{
encryption_public_key_to_string, encryption_secret_key_to_string,
Expand Down Expand Up @@ -112,7 +113,7 @@ fn main() {
db_path,
node_env.first_device_needs_registration_code,
initial_agent,
None // TODO: Add a way to pass proxy settings from env
NodeProxyMode::NoProxy // TODO: Add a way to pass proxy settings from env
)
.await
}),
Expand Down
3 changes: 2 additions & 1 deletion src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ pub mod node_internal_commands;
pub mod node_api_commands;
pub mod node_local_commands;
pub mod node_api;
pub mod node_error;
pub mod node_error;
pub mod node_proxy;
72 changes: 36 additions & 36 deletions src/network/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use core::panic;
use ed25519_dalek::{PublicKey as SignaturePublicKey, SecretKey as SignatureStaticKey};
use futures::{future::FutureExt, pin_mut, prelude::*, select};
use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize};
use shinkai_message_primitives::schemas::agents::serialized_agent::SerializedAgent;
use shinkai_message_primitives::schemas::inbox_name::InboxNameError;
use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName;
Expand Down Expand Up @@ -41,6 +42,7 @@ use crate::schemas::smart_inbox::SmartInbox;

use super::node_api::{APIError, APIUseRegistrationCodeSuccessResponse};
use super::node_error::NodeError;
use super::node_proxy::NodeProxyMode;

pub enum NodeCommand {
Shutdown,
Expand Down Expand Up @@ -222,36 +224,6 @@ pub enum NodeCommand {
// A type alias for a string that represents a profile name.
type ProfileName = String;

pub enum NodeProxyMode {
// Node acts as a proxy, holds identities it proxies for
// and a flag indicating if it allows new identities
// if the flag is also then it will also clean up saved identities
IsProxy(ProxyMode),
// Node is being proxied, holds its proxy's identity
IsProxied(ProxyIdentity),
// Node is not using a proxy
NoProxy,
}

#[derive(Clone, Debug)]
pub struct ProxyMode {
// Flag indicating if new identities can be added
pub allow_new_identities: bool,
// Starting node identities
pub proxy_node_identities: HashMap<String, ProxyIdentity>,
}

#[derive(Clone, Debug)]
pub struct ProxyIdentity {
// Address of the API proxy
pub api_peer: SocketAddr,
// Address of the TCP proxy
pub tcp_peer: SocketAddr,
// Name of the proxied node
// Or the name of my identity proxied
pub shinkai_name: ShinkaiName,
}

// The `Node` struct represents a single node in the network.
pub struct Node {
// The mode of the node
Expand Down Expand Up @@ -799,13 +771,13 @@ impl Node {
);

// Extract sender's public keys and verify the signature
let sender_profile_name_string = ShinkaiName::from_shinkai_message_only_using_sender_node_name(&message)
let sender_node_name_string = ShinkaiName::from_shinkai_message_only_using_sender_node_name(&message)
.unwrap()
.get_node_name();
let sender_identity = maybe_identity_manager
.lock()
.await
.external_profile_to_global_identity(&sender_profile_name_string)
.external_profile_to_global_identity(&sender_node_name_string)
.await
.unwrap();

Expand All @@ -816,7 +788,7 @@ impl Node {
ShinkaiLogLevel::Debug,
&format!(
"{} > Sender Profile Name: {:?}",
receiver_address, sender_profile_name_string
receiver_address, sender_node_name_string
),
);
shinkai_log(
Expand All @@ -836,8 +808,24 @@ impl Node {
);

// TODO(Nico): split this part depending on Proxy Mode

// Save to db
// If we are a proxy, we need to check if the message is for one of our proxied identities

// TODO: add handle_based_on_message_content_and_encryption back
Ok(())
}

pub async fn handle_message_no_proxy(
message: ShinkaiMessage,
sender_node_name_string: String,
// sender_identity: GlobalIdentity,
my_encryption_secret_key: EncryptionStaticKey,
my_signature_secret_key: SignatureStaticKey,
my_node_profile_name: String,
maybe_db: Arc<Mutex<ShinkaiDB>>,
maybe_identity_manager: Arc<Mutex<IdentityManager>>,
receiver_address: SocketAddr,
unsafe_sender_address: SocketAddr,
) -> Result<(), NodeError> {
{
Node::save_to_db(
false,
Expand All @@ -853,7 +841,7 @@ impl Node {
message.clone(),
sender_identity.node_encryption_public_key,
sender_identity.addr.clone().unwrap(),
sender_profile_name_string,
sender_node_name_string,
&my_encryption_secret_key,
&my_signature_secret_key,
&my_node_profile_name,
Expand All @@ -864,4 +852,16 @@ impl Node {
)
.await
}

pub async fn proxied_handle_message(
message: ShinkaiMessage,
sender_node_name_string: String,
maybe_db: Arc<Mutex<ShinkaiDB>>,
unsafe_sender_address: SocketAddr,
) -> Result<(), NodeError> {
// check if the message is for one of our proxied identities
// if so then send it to the proxied identity
//
Ok(())
}
}
111 changes: 104 additions & 7 deletions src/network/node_api_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{convert::TryInto, sync::Arc};
use super::{
node_api::{APIError, APIUseRegistrationCodeSuccessResponse},
node_error::NodeError,
Node,
Node, node_proxy::NodeProxyMode,
};
use crate::{
db::db_errors::ShinkaiDBError,
Expand Down Expand Up @@ -44,7 +44,8 @@ use shinkai_message_primitives::{
clone_static_secret_key, decrypt_with_chacha20poly1305, encryption_public_key_to_string,
encryption_secret_key_to_string, string_to_encryption_public_key, EncryptionMethod,
},
signatures::{clone_signature_secret_key, signature_public_key_to_string, string_to_signature_public_key}, shinkai_logging::{shinkai_log, ShinkaiLogOption, ShinkaiLogLevel},
shinkai_logging::{shinkai_log, ShinkaiLogLevel, ShinkaiLogOption},
signatures::{clone_signature_secret_key, signature_public_key_to_string, string_to_signature_public_key},
},
};
use std::pin::Pin;
Expand Down Expand Up @@ -715,7 +716,11 @@ impl Node {
shinkai_log(
ShinkaiLogOption::Identity,
ShinkaiLogLevel::Info,
format!("registration code usage> first device needs registration code?: {:?}", self.first_device_needs_registration_code).as_str(),
format!(
"registration code usage> first device needs registration code?: {:?}",
self.first_device_needs_registration_code
)
.as_str(),
);

let main_profile_exists = match db.main_profile_exists(self.node_profile_name.get_node_name().as_str()) {
Expand All @@ -735,7 +740,11 @@ impl Node {
shinkai_log(
ShinkaiLogOption::Identity,
ShinkaiLogLevel::Debug,
format!("registration code usage> main_profile_exists: {:?}", main_profile_exists).as_str(),
format!(
"registration code usage> main_profile_exists: {:?}",
main_profile_exists
)
.as_str(),
);

if self.first_device_needs_registration_code == false {
Expand Down Expand Up @@ -917,7 +926,7 @@ impl Node {
if main_profile_exists == false && self.initial_agent.is_some() {
std::mem::drop(identity_manager);
self.internal_add_agent(self.initial_agent.clone().unwrap()).await?;
}
}

let success_response = APIUseRegistrationCodeSuccessResponse {
message: success,
Expand Down Expand Up @@ -1641,8 +1650,11 @@ impl Node {
ShinkaiLogLevel::Debug,
format!(
"api_add_file_to_inbox_with_symmetric_key> filename: {}, hex_blake3_hash: {}, decrypted_file.len(): {}",
filename, hex_blake3_hash, decrypted_file.len()
).as_str()
filename,
hex_blake3_hash,
decrypted_file.len()
)
.as_str(),
);

match self
Expand All @@ -1668,6 +1680,91 @@ impl Node {
}
}

// TODO: move to new file node_proxy.rs
pub async fn handle_send_message(
&self,
potentially_encrypted_msg: ShinkaiMessage,
res: Sender<Result<(), APIError>>,
) -> Result<(), NodeError> {
match &self.proxy_mode {
NodeProxyMode::IsProxied(_) => {
// I received the message! so we are already good
self.api_handle_send_onionized_message(potentially_encrypted_msg, res)
.await
}
NodeProxyMode::IsProxy(_) => {
// send_msg_handler API -> Node
// Check who is the receiver
// Check that the receiver is part of the identities
// Send the message to the receiver using our stored Addr
let recipient_node = ShinkaiName::from_shinkai_message_only_using_recipient_node_name(
&potentially_encrypted_msg.clone(),
);
match recipient_node {
Ok(recipient_node_name) => {
let result = self.db.lock().await.get_proxied_identity(&recipient_node_name);
match result {
Ok(Some(proxied_identity)) => {
let api_peer = proxied_identity.api_peer;
let client = reqwest::Client::new();
let res = client
.post(format!("http://{}:{}/v1/send", api_peer.ip(), api_peer.port()))
.json(&potentially_encrypted_msg)
.send()
.await;
match res {
Ok(response) => {
if response.status().is_success() {
Ok(())
} else {
Err(NodeError {
message: format!(
"Failed to send message to peer: {}",
response.status()
),
})
}
}
Err(err) => Err(NodeError {
message: format!("Failed to send message to peer: {}", err),
}),
}
}
Ok(None) => {
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Debug,
format!("No proxied identity found for node: {}", recipient_node_name).as_str(),
);
Ok(())
}
Err(err) => {
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
format!("Error getting proxied identity: {}", err).as_str(),
);
Ok(())
}
}
}
Err(_) => {
shinkai_log(
ShinkaiLogOption::Node,
ShinkaiLogLevel::Error,
"Error getting recipient node name from message",
);
return Ok(());
}
}
}
NodeProxyMode::NoProxy => {
self.api_handle_send_onionized_message(potentially_encrypted_msg, res)
.await
}
}
}

pub async fn api_handle_send_onionized_message(
&self,
potentially_encrypted_msg: ShinkaiMessage,
Expand Down
37 changes: 37 additions & 0 deletions src/network/node_proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::{collections::HashMap, net::SocketAddr};

use serde::{Serialize, Deserialize};
use shinkai_message_primitives::schemas::shinkai_name::ShinkaiName;


#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum NodeProxyMode {
// Node acts as a proxy, holds identities it proxies for
// and a flag indicating if it allows new identities
// if the flag is also then it will also clean up saved identities
IsProxy(IsProxyConf),
// Node is being proxied, holds its proxy's identity
IsProxied(ProxyIdentity),
// Node is not using a proxy
NoProxy,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct IsProxyConf {
// Flag indicating if new identities can be added
pub allow_new_identities: bool,
// Starting node identities
pub proxy_node_identities: HashMap<String, ProxyIdentity>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ProxyIdentity {
// Address of the API proxy
pub api_peer: SocketAddr,
// Address of the TCP proxy
pub tcp_peer: SocketAddr,
// Name of the proxied node
// Or the name of my identity proxied
pub shinkai_name: ShinkaiName,
}

Loading

0 comments on commit 235cfcb

Please sign in to comment.