diff --git a/shinkai-bin/shinkai-node/src/llm_provider/execution/job_execution_core.rs b/shinkai-bin/shinkai-node/src/llm_provider/execution/job_execution_core.rs index 4da4c9677..5636249c9 100644 --- a/shinkai-bin/shinkai-node/src/llm_provider/execution/job_execution_core.rs +++ b/shinkai-bin/shinkai-node/src/llm_provider/execution/job_execution_core.rs @@ -275,6 +275,8 @@ impl JobManager { &format!("Inference chain - Processing Job: {:?}", full_job.job_id), ); + eprintln!("Full job: {:?}", full_job); + // Retrieve image files from the message // Note: this could be other type of files later on e.g. video, audio, etc. let image_files = JobManager::get_image_files_from_message(vector_fs.clone(), &job_message).await?; @@ -677,21 +679,26 @@ impl JobManager { // Create a mutable copy of full_job let mut mutable_job = full_job.clone(); - // TODO: fix this once we have uploaded files - // // Update the job scope based on the ProcessedInput - // for (file_path, file_name) in &input_string.local_files { - // let local_entry = LocalScopeVRKaiEntry { - // vrkai: VRKai::new( - // BaseVectorResource::Document(Document::new( - // file_name.clone(), - // VRSourceReference::LocalFile(file_path.clone()), - // vec![], - // )), - // vec![], - // ), - // }; - // mutable_job.scope.local_vrkai.push(local_entry); - // } + if input_string.uploaded_files.len() > 0 { + // Decompose the uploaded_files into two separate vectors + let (files_inbox, file_names): (Vec, Vec) = + input_string.uploaded_files.iter().cloned().unzip(); + + Self::process_specified_files_for_vector_resources( + db.clone(), + vector_fs.clone(), + files_inbox.first().unwrap().clone(), + file_names, + None, + &mut mutable_job, + user_profile.clone(), + None, + generator.clone(), + ) + .await?; + } + + eprintln!("full_job: {:?}", mutable_job); for (local_file_path, local_file_name) in &input_string.local_files { let vector_fs_entry = VectorFSItemScopeEntry { @@ -807,8 +814,106 @@ impl JobManager { } } - /// Processes the files sent together with the current job_message into Vector Resources, - /// and saves them either into the local job scope, or the DB depending on `save_to_db_directly`. + /// Helper function to process files and update the job scope. + async fn process_files_and_update_scope( + db: Arc, + vector_fs: Arc, + files: Vec<(String, Vec)>, + agent_found: Option, + full_job: &mut Job, + profile: ShinkaiName, + save_to_vector_fs_folder: Option, + generator: RemoteEmbeddingGenerator, + ) -> Result<(), LLMProviderError> { + // Process the files + let new_scope_entries_result = JobManager::process_files_inbox( + db.clone(), + vector_fs.clone(), + agent_found, + files, + profile, + save_to_vector_fs_folder, + generator, + ) + .await; + + match new_scope_entries_result { + Ok(new_scope_entries) => { + for (_, value) in new_scope_entries { + match value { + ScopeEntry::LocalScopeVRKai(local_entry) => { + if !full_job.scope.local_vrkai.contains(&local_entry) { + full_job.scope.local_vrkai.push(local_entry); + } else { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + "Duplicate LocalScopeVRKaiEntry detected", + ); + } + } + ScopeEntry::LocalScopeVRPack(local_entry) => { + if !full_job.scope.local_vrpack.contains(&local_entry) { + full_job.scope.local_vrpack.push(local_entry); + } else { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + "Duplicate LocalScopeVRPackEntry detected", + ); + } + } + ScopeEntry::VectorFSItem(fs_entry) => { + if !full_job.scope.vector_fs_items.contains(&fs_entry) { + full_job.scope.vector_fs_items.push(fs_entry); + } else { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + "Duplicate VectorFSScopeEntry detected", + ); + } + } + ScopeEntry::VectorFSFolder(fs_entry) => { + if !full_job.scope.vector_fs_folders.contains(&fs_entry) { + full_job.scope.vector_fs_folders.push(fs_entry); + } else { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + "Duplicate VectorFSScopeEntry detected", + ); + } + } + ScopeEntry::NetworkFolder(nf_entry) => { + if !full_job.scope.network_folders.contains(&nf_entry) { + full_job.scope.network_folders.push(nf_entry); + } else { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + "Duplicate VectorFSScopeEntry detected", + ); + } + } + } + } + db.update_job_scope(full_job.job_id().to_string(), full_job.scope.clone())?; + } + Err(e) => { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Error, + format!("Error processing files: {}", e).as_str(), + ); + return Err(e); + } + } + + Ok(()) + } + + /// Processes the files sent together with the current job_message into Vector Resources. #[allow(clippy::too_many_arguments)] pub async fn process_job_message_files_for_vector_resources( db: Arc, @@ -820,118 +925,93 @@ impl JobManager { save_to_vector_fs_folder: Option, generator: RemoteEmbeddingGenerator, ) -> Result<(), LLMProviderError> { + eprintln!("full_job: {:?}", full_job); + eprintln!("job_message: {:?}", job_message); + if !job_message.files_inbox.is_empty() { shinkai_log( ShinkaiLogOption::JobExecution, ShinkaiLogLevel::Debug, format!("Processing files_map: ... files: {}", job_message.files_inbox.len()).as_str(), ); - eprintln!("Processing files_map: ... files: {}", job_message.files_inbox.len()); - { - // Get the files from the DB - let files = { - let files_result = vector_fs.db.get_all_files_from_inbox(job_message.files_inbox.clone()); - // Check if there was an error getting the files - match files_result { - Ok(files) => files, - Err(e) => return Err(LLMProviderError::VectorFS(e)), - } - }; - // Print out all the files - for (filename, _) in &files { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Debug, - &format!("File found: {}", filename), - ); - eprintln!("File found: {}", filename); + // Get the files from the DB + let files = { + let files_result = vector_fs.db.get_all_files_from_inbox(job_message.files_inbox.clone()); + match files_result { + Ok(files) => files, + Err(e) => return Err(LLMProviderError::VectorFS(e)), } - } - // TODO: later we should able to grab errors and return them to the user - let new_scope_entries_result = JobManager::process_files_inbox( - db.clone(), - vector_fs.clone(), + }; + + // Process the files and update the job scope + Self::process_files_and_update_scope( + db, + vector_fs, + files, agent_found, - job_message.files_inbox.clone(), + full_job, profile, save_to_vector_fs_folder, generator, ) - .await; - - match new_scope_entries_result { - Ok(new_scope_entries) => { - for (_, value) in new_scope_entries { - match value { - ScopeEntry::LocalScopeVRKai(local_entry) => { - if !full_job.scope.local_vrkai.contains(&local_entry) { - full_job.scope.local_vrkai.push(local_entry); - } else { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - "Duplicate LocalScopeVRKaiEntry detected", - ); - } - } - ScopeEntry::LocalScopeVRPack(local_entry) => { - if !full_job.scope.local_vrpack.contains(&local_entry) { - full_job.scope.local_vrpack.push(local_entry); - } else { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - "Duplicate LocalScopeVRPackEntry detected", - ); - } - } - ScopeEntry::VectorFSItem(fs_entry) => { - if !full_job.scope.vector_fs_items.contains(&fs_entry) { - full_job.scope.vector_fs_items.push(fs_entry); - } else { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - "Duplicate VectorFSScopeEntry detected", - ); - } - } - ScopeEntry::VectorFSFolder(fs_entry) => { - if !full_job.scope.vector_fs_folders.contains(&fs_entry) { - full_job.scope.vector_fs_folders.push(fs_entry); - } else { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - "Duplicate VectorFSScopeEntry detected", - ); - } - } - ScopeEntry::NetworkFolder(nf_entry) => { - if !full_job.scope.network_folders.contains(&nf_entry) { - full_job.scope.network_folders.push(nf_entry); - } else { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - "Duplicate VectorFSScopeEntry detected", - ); - } - } - } - } - db.update_job_scope(full_job.job_id().to_string(), full_job.scope.clone())?; - } - Err(e) => { - shinkai_log( - ShinkaiLogOption::JobExecution, - ShinkaiLogLevel::Error, - format!("Error processing files: {}", e).as_str(), - ); - return Err(e); + .await?; + } + + Ok(()) + } + + /// Processes the specified files into Vector Resources. + #[allow(clippy::too_many_arguments)] + pub async fn process_specified_files_for_vector_resources( + db: Arc, + vector_fs: Arc, + files_inbox: String, + file_names: Vec, + agent_found: Option, + full_job: &mut Job, + profile: ShinkaiName, + save_to_vector_fs_folder: Option, + generator: RemoteEmbeddingGenerator, + ) -> Result<(), LLMProviderError> { + eprintln!("full_job: {:?}", full_job); + eprintln!("files_inbox: {:?}", files_inbox); + eprintln!("file_names: {:?}", file_names); + + if !file_names.is_empty() { + shinkai_log( + ShinkaiLogOption::JobExecution, + ShinkaiLogLevel::Debug, + format!("Processing specified files: {:?}", file_names).as_str(), + ); + + // Get the files from the DB + let files = { + let files_result = vector_fs.db.get_all_files_from_inbox(files_inbox.clone()); + match files_result { + Ok(files) => files, + Err(e) => return Err(LLMProviderError::VectorFS(e)), } - } + }; + + // Filter files based on the provided file names + let specified_files: Vec<(String, Vec)> = files + .into_iter() + .filter(|(name, _)| file_names.contains(name)) + .collect(); + + // Process the specified files and update the job scope + Self::process_files_and_update_scope( + db, + vector_fs, + specified_files, + agent_found, + full_job, + profile, + save_to_vector_fs_folder, + generator, + ) + .await?; } Ok(()) @@ -984,10 +1064,10 @@ impl JobManager { /// Else, the files will be returned as LocalScopeEntries and thus held inside. #[allow(clippy::too_many_arguments)] pub async fn process_files_inbox( - db: Arc, - vector_fs: Arc, + _db: Arc, + _vector_fs: Arc, agent: Option, - files_inbox: String, + files: Vec<(String, Vec)>, _profile: ShinkaiName, save_to_vector_fs_folder: Option, generator: RemoteEmbeddingGenerator, @@ -995,16 +1075,6 @@ impl JobManager { // Create the RemoteEmbeddingGenerator instance let mut files_map: HashMap = HashMap::new(); - // Get the files from the DB - let files = { - let files_result = vector_fs.db.get_all_files_from_inbox(files_inbox.clone()); - // Check if there was an error getting the files - match files_result { - Ok(files) => files, - Err(e) => return Err(LLMProviderError::VectorFS(e)), - } - }; - // Filter out image files // TODO: Eventually we will add extra embeddings that support images let files: Vec<(String, Vec)> = files diff --git a/shinkai-bin/shinkai-node/src/llm_provider/execution/job_vector_search.rs b/shinkai-bin/shinkai-node/src/llm_provider/execution/job_vector_search.rs index 93e38b287..9582ad53a 100644 --- a/shinkai-bin/shinkai-node/src/llm_provider/execution/job_vector_search.rs +++ b/shinkai-bin/shinkai-node/src/llm_provider/execution/job_vector_search.rs @@ -390,40 +390,4 @@ impl JobManager { 3 } } - - /// If include_description is true then adds the description of the Vector Resource - /// that the top scored retrieved node is from, by prepending a fake RetrievedNode - /// with the description inside. Removes the lowest scored node to preserve list length. - async fn generate_description_retrieved_node( - include_description: bool, - top_node: RetrievedNode, - resources: &[BaseVectorResource], - ) -> Vec { - let mut new_nodes = vec![]; - - if include_description { - let resource_header = top_node.resource_header.clone(); - - // Iterate through resources until we find one with a matching resource reference string - for resource in resources { - if resource.as_trait_object().generate_resource_header().reference_string() - == resource_header.reference_string() - { - if let Some(description) = resource.as_trait_object().description() { - let description_node = RetrievedNode::new( - Node::new_text(String::new(), description.to_string(), None, &vec![]), - 1.0_f32, - resource_header, - top_node.retrieval_path.clone(), - ); - new_nodes.insert(0, description_node); - new_nodes.pop(); // Remove the last element to maintain the same length - } - break; - } - } - } - - new_nodes - } } diff --git a/shinkai-bin/shinkai-node/src/llm_provider/execution/prompts/general_prompts.rs b/shinkai-bin/shinkai-node/src/llm_provider/execution/prompts/general_prompts.rs index 2d04b313d..5d047f4e2 100644 --- a/shinkai-bin/shinkai-node/src/llm_provider/execution/prompts/general_prompts.rs +++ b/shinkai-bin/shinkai-node/src/llm_provider/execution/prompts/general_prompts.rs @@ -6,6 +6,7 @@ use shinkai_message_primitives::schemas::{ pub struct JobPromptGenerator {} impl JobPromptGenerator { + // TODO: needs urgent fix pub fn simple_doc_description(nodes: Vec) -> Prompt { let mut prompt = Prompt::new(); prompt.add_content( diff --git a/shinkai-libs/shinkai-sheet/src/sheet.rs b/shinkai-libs/shinkai-sheet/src/sheet.rs index 6735258d8..e03a7de76 100644 --- a/shinkai-libs/shinkai-sheet/src/sheet.rs +++ b/shinkai-libs/shinkai-sheet/src/sheet.rs @@ -481,10 +481,11 @@ impl Sheet { if let Some(cell) = self.get_cell(row.clone(), col_uuid.clone()) { if let Some(value) = &cell.value { // Assuming the value is a serialized list of file names - let file_names: Vec = serde_json::from_str(value).unwrap_or_default(); - for file_name in file_names { - uploaded_files.push((file_inbox_id.clone(), file_name)); - } + // TODO: eventually if we want to support multiple files, we need to change this + // let file_names: Vec = serde_json::from_str(value).unwrap_or_default(); + // for file_name in file_names { + uploaded_files.push((file_inbox_id.clone(), value.clone())); + // } } } }