Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
nicarq committed Jan 20, 2024
1 parent 7a71a8f commit 157bccd
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ base64 = "0.13.0"
ethers = "2.0"
dashmap = "5.5.3"
tiny-bip39 = "0.8.0"
tracing = "0.1.40"

[dependencies.rocksdb]
version = "0.21.0"
Expand Down
4 changes: 4 additions & 0 deletions src/agent/execution/chains/inference_chain_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use shinkai_vector_resources::embedding_generator::EmbeddingGenerator;
use std::result::Result::Ok;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::instrument;

use super::cron_creation_chain::CronCreationChainResponse;
use super::cron_execution_chain::CronExecutionChainResponse;
Expand All @@ -28,6 +29,7 @@ impl JobManager {
/// Chooses an inference chain based on the job message (using the agent's LLM)
/// and then starts using the chosen chain.
/// Returns the final String result from the inferencing, and a new execution context.
#[instrument(skip(generator, db))]
pub async fn inference_chain_router(
db: Arc<Mutex<ShinkaiDB>>,
agent_found: Option<SerializedAgent>,
Expand Down Expand Up @@ -76,6 +78,7 @@ impl JobManager {
// TODO: Delete this
// TODO: Merge this with the above function. We are not doing that right now bc we need to decide how to select Chains.
// Could it be based on the first message of the Job?
#[instrument(skip(db))]
pub async fn alt_inference_chain_router(
db: Arc<Mutex<ShinkaiDB>>,
agent_found: Option<SerializedAgent>,
Expand Down Expand Up @@ -132,6 +135,7 @@ impl JobManager {
Ok((inference_response_content, new_execution_context))
}

#[instrument(skip(db, chosen_chain))]
pub async fn cron_inference_chain_router_summary(
db: Arc<Mutex<ShinkaiDB>>,
agent_found: Option<SerializedAgent>,
Expand Down
3 changes: 2 additions & 1 deletion src/agent/execution/chains/qa_inference_chain.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::agent::agent::Agent;
use crate::agent::error::AgentError;
use crate::agent::execution::job_prompts::JobPromptGenerator;
use crate::agent::file_parsing::ParsingHelper;
Expand All @@ -13,11 +12,13 @@ use shinkai_vector_resources::embeddings::MAX_EMBEDDING_STRING_SIZE;
use std::result::Result::Ok;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::instrument;

impl JobManager {
/// An inference chain for question-answer job tasks which vector searches the Vector Resources
/// in the JobScope to find relevant content for the LLM to use at each step.
#[async_recursion]
#[instrument(skip(generator, db))]
pub async fn start_qa_inference_chain(
db: Arc<Mutex<ShinkaiDB>>,
full_job: Job,
Expand Down
2 changes: 2 additions & 0 deletions src/agent/execution/job_execution_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ use std::result::Result::Ok;
use std::time::Instant;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;
use tracing::instrument;

impl JobManager {
/// Processes a job message which will trigger a job step
#[instrument(skip(identity_secret_key, db))]
pub async fn process_job_message_queued(
job_message: JobForProcessing,
db: Arc<Mutex<ShinkaiDB>>,
Expand Down
40 changes: 24 additions & 16 deletions src/network/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,24 @@ impl From<String> for APIError {
impl warp::reject::Reject for APIError {}

pub async fn run_api(node_commands_sender: Sender<NodeCommand>, address: SocketAddr, node_name: String) {
println!("Starting Node API server at: {}", &address);
shinkai_log(
ShinkaiLogOption::API,
ShinkaiLogLevel::Info,
&format!("Starting Node API server at: {}", &address),
);

let log = warp::log::custom(|info| {
eprintln!(
"ip: {:?}, method: {:?}, path: {:?}, status: {:?}, elapsed: {:?}",
info.remote_addr(),
info.method(),
info.path(),
info.status(),
info.elapsed(),
shinkai_log(
ShinkaiLogOption::API,
ShinkaiLogLevel::Debug,
&format!(
"ip: {:?}, method: {:?}, path: {:?}, status: {:?}, elapsed: {:?}",
info.remote_addr(),
info.method(),
info.path(),
info.status(),
info.elapsed(),
),
);
});

Expand Down Expand Up @@ -406,12 +414,6 @@ pub async fn run_api(node_commands_sender: Sender<NodeCommand>, address: SocketA
.with(cors);

warp::serve(routes).run(address).await;

shinkai_log(
ShinkaiLogOption::API,
ShinkaiLogLevel::Info,
&format!("Server successfully started at: {}", &address),
);
}

async fn handle_node_command<T, U>(
Expand Down Expand Up @@ -526,7 +528,10 @@ async fn send_msg_handler(
})
.await
.map_err(|e| warp::reject::custom(APIError::from(e)))?;
let send_result = res_send_msg_receiver.recv().await.map_err(|e| warp::reject::custom(APIError::from(format!("{}", e))))?;
let send_result = res_send_msg_receiver
.recv()
.await
.map_err(|e| warp::reject::custom(APIError::from(format!("{}", e))))?;
match send_result {
Ok(_) => {
let resp = warp::reply::json(&"Message sent successfully");
Expand Down Expand Up @@ -947,7 +952,10 @@ async fn handle_rejection(err: warp::Rejection) -> Result<impl warp::Reply, warp
eprintln!("rejection: {:?}", err);
if let Some(api_error) = err.find::<APIError>() {
let json = warp::reply::json(api_error);
return Ok(warp::reply::with_status(json, StatusCode::from_u16(api_error.code).unwrap()));
return Ok(warp::reply::with_status(
json,
StatusCode::from_u16(api_error.code).unwrap(),
));
} else if err.is_not_found() {
let json = warp::reply::json(&APIError::new(
StatusCode::NOT_FOUND,
Expand Down
1 change: 0 additions & 1 deletion src/network/node_api_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,6 @@ impl Node {
}
};

eprintln!("api_get_all_inboxes_for_profile> msg: {:?}", msg);
let profile_requested: String = msg.get_message_content()?;

// Check that the message is coming from someone with the right permissions to do this action
Expand Down
2 changes: 0 additions & 2 deletions src/network/node_internal_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,11 @@ impl Node {
shinkai_message: ShinkaiMessage,
sender: Identity,
) -> Result<String, NodeError> {
println!("Creating new job");
let job_manager = self.job_manager.as_ref().expect("JobManager not initialized");
match job_manager.lock().await.process_job_message(shinkai_message).await {
Ok(job_id) => {
{
let inbox_name = InboxName::get_job_inbox_name_from_params(job_id.clone()).unwrap();
println!("Adding permission for inbox: {}", inbox_name.to_string());
let sender_standard = match sender {
Identity::Standard(std_identity) => std_identity,
_ => {
Expand Down

0 comments on commit 157bccd

Please sign in to comment.