Skip to content

Commit

Permalink
Merge pull request #611 from dcSpark/nico/fix_sheet_fiel_columns
Browse files Browse the repository at this point in the history
fix sheet files columns
  • Loading branch information
nicarq authored Oct 17, 2024
2 parents be035c5 + f538ff3 commit 066b3ab
Show file tree
Hide file tree
Showing 5 changed files with 225 additions and 180 deletions.
326 changes: 198 additions & 128 deletions shinkai-bin/shinkai-node/src/llm_provider/execution/job_execution_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,8 @@ impl JobManager {
&format!("Inference chain - Processing Job: {:?}", full_job.job_id),
);

eprintln!("Full job: {:?}", full_job);

// Retrieve image files from the message
// Note: this could be other type of files later on e.g. video, audio, etc.
let image_files = JobManager::get_image_files_from_message(vector_fs.clone(), &job_message).await?;
Expand Down Expand Up @@ -677,21 +679,26 @@ impl JobManager {
// Create a mutable copy of full_job
let mut mutable_job = full_job.clone();

// TODO: fix this once we have uploaded files
// // Update the job scope based on the ProcessedInput
// for (file_path, file_name) in &input_string.local_files {
// let local_entry = LocalScopeVRKaiEntry {
// vrkai: VRKai::new(
// BaseVectorResource::Document(Document::new(
// file_name.clone(),
// VRSourceReference::LocalFile(file_path.clone()),
// vec![],
// )),
// vec![],
// ),
// };
// mutable_job.scope.local_vrkai.push(local_entry);
// }
if input_string.uploaded_files.len() > 0 {
// Decompose the uploaded_files into two separate vectors
let (files_inbox, file_names): (Vec<String>, Vec<String>) =
input_string.uploaded_files.iter().cloned().unzip();

Self::process_specified_files_for_vector_resources(
db.clone(),
vector_fs.clone(),
files_inbox.first().unwrap().clone(),
file_names,
None,
&mut mutable_job,
user_profile.clone(),
None,
generator.clone(),
)
.await?;
}

eprintln!("full_job: {:?}", mutable_job);

for (local_file_path, local_file_name) in &input_string.local_files {
let vector_fs_entry = VectorFSItemScopeEntry {
Expand Down Expand Up @@ -807,8 +814,106 @@ impl JobManager {
}
}

/// Processes the files sent together with the current job_message into Vector Resources,
/// and saves them either into the local job scope, or the DB depending on `save_to_db_directly`.
/// Helper function to process files and update the job scope.
async fn process_files_and_update_scope(
db: Arc<ShinkaiDB>,
vector_fs: Arc<VectorFS>,
files: Vec<(String, Vec<u8>)>,
agent_found: Option<SerializedLLMProvider>,
full_job: &mut Job,
profile: ShinkaiName,
save_to_vector_fs_folder: Option<VRPath>,
generator: RemoteEmbeddingGenerator,
) -> Result<(), LLMProviderError> {
// Process the files
let new_scope_entries_result = JobManager::process_files_inbox(
db.clone(),
vector_fs.clone(),
agent_found,
files,
profile,
save_to_vector_fs_folder,
generator,
)
.await;

match new_scope_entries_result {
Ok(new_scope_entries) => {
for (_, value) in new_scope_entries {
match value {
ScopeEntry::LocalScopeVRKai(local_entry) => {
if !full_job.scope.local_vrkai.contains(&local_entry) {
full_job.scope.local_vrkai.push(local_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate LocalScopeVRKaiEntry detected",
);
}
}
ScopeEntry::LocalScopeVRPack(local_entry) => {
if !full_job.scope.local_vrpack.contains(&local_entry) {
full_job.scope.local_vrpack.push(local_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate LocalScopeVRPackEntry detected",
);
}
}
ScopeEntry::VectorFSItem(fs_entry) => {
if !full_job.scope.vector_fs_items.contains(&fs_entry) {
full_job.scope.vector_fs_items.push(fs_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
ScopeEntry::VectorFSFolder(fs_entry) => {
if !full_job.scope.vector_fs_folders.contains(&fs_entry) {
full_job.scope.vector_fs_folders.push(fs_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
ScopeEntry::NetworkFolder(nf_entry) => {
if !full_job.scope.network_folders.contains(&nf_entry) {
full_job.scope.network_folders.push(nf_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
}
}
db.update_job_scope(full_job.job_id().to_string(), full_job.scope.clone())?;
}
Err(e) => {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
format!("Error processing files: {}", e).as_str(),
);
return Err(e);
}
}

Ok(())
}

/// Processes the files sent together with the current job_message into Vector Resources.
#[allow(clippy::too_many_arguments)]
pub async fn process_job_message_files_for_vector_resources(
db: Arc<ShinkaiDB>,
Expand All @@ -820,118 +925,93 @@ impl JobManager {
save_to_vector_fs_folder: Option<VRPath>,
generator: RemoteEmbeddingGenerator,
) -> Result<(), LLMProviderError> {
eprintln!("full_job: {:?}", full_job);
eprintln!("job_message: {:?}", job_message);

if !job_message.files_inbox.is_empty() {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
format!("Processing files_map: ... files: {}", job_message.files_inbox.len()).as_str(),
);
eprintln!("Processing files_map: ... files: {}", job_message.files_inbox.len());
{
// Get the files from the DB
let files = {
let files_result = vector_fs.db.get_all_files_from_inbox(job_message.files_inbox.clone());
// Check if there was an error getting the files
match files_result {
Ok(files) => files,
Err(e) => return Err(LLMProviderError::VectorFS(e)),
}
};

// Print out all the files
for (filename, _) in &files {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
&format!("File found: {}", filename),
);
eprintln!("File found: {}", filename);
// Get the files from the DB
let files = {
let files_result = vector_fs.db.get_all_files_from_inbox(job_message.files_inbox.clone());
match files_result {
Ok(files) => files,
Err(e) => return Err(LLMProviderError::VectorFS(e)),
}
}
// TODO: later we should able to grab errors and return them to the user
let new_scope_entries_result = JobManager::process_files_inbox(
db.clone(),
vector_fs.clone(),
};

// Process the files and update the job scope
Self::process_files_and_update_scope(
db,
vector_fs,
files,
agent_found,
job_message.files_inbox.clone(),
full_job,
profile,
save_to_vector_fs_folder,
generator,
)
.await;

match new_scope_entries_result {
Ok(new_scope_entries) => {
for (_, value) in new_scope_entries {
match value {
ScopeEntry::LocalScopeVRKai(local_entry) => {
if !full_job.scope.local_vrkai.contains(&local_entry) {
full_job.scope.local_vrkai.push(local_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate LocalScopeVRKaiEntry detected",
);
}
}
ScopeEntry::LocalScopeVRPack(local_entry) => {
if !full_job.scope.local_vrpack.contains(&local_entry) {
full_job.scope.local_vrpack.push(local_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate LocalScopeVRPackEntry detected",
);
}
}
ScopeEntry::VectorFSItem(fs_entry) => {
if !full_job.scope.vector_fs_items.contains(&fs_entry) {
full_job.scope.vector_fs_items.push(fs_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
ScopeEntry::VectorFSFolder(fs_entry) => {
if !full_job.scope.vector_fs_folders.contains(&fs_entry) {
full_job.scope.vector_fs_folders.push(fs_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
ScopeEntry::NetworkFolder(nf_entry) => {
if !full_job.scope.network_folders.contains(&nf_entry) {
full_job.scope.network_folders.push(nf_entry);
} else {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
"Duplicate VectorFSScopeEntry detected",
);
}
}
}
}
db.update_job_scope(full_job.job_id().to_string(), full_job.scope.clone())?;
}
Err(e) => {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Error,
format!("Error processing files: {}", e).as_str(),
);
return Err(e);
.await?;
}

Ok(())
}

/// Processes the specified files into Vector Resources.
#[allow(clippy::too_many_arguments)]
pub async fn process_specified_files_for_vector_resources(
db: Arc<ShinkaiDB>,
vector_fs: Arc<VectorFS>,
files_inbox: String,
file_names: Vec<String>,
agent_found: Option<SerializedLLMProvider>,
full_job: &mut Job,
profile: ShinkaiName,
save_to_vector_fs_folder: Option<VRPath>,
generator: RemoteEmbeddingGenerator,
) -> Result<(), LLMProviderError> {
eprintln!("full_job: {:?}", full_job);
eprintln!("files_inbox: {:?}", files_inbox);
eprintln!("file_names: {:?}", file_names);

if !file_names.is_empty() {
shinkai_log(
ShinkaiLogOption::JobExecution,
ShinkaiLogLevel::Debug,
format!("Processing specified files: {:?}", file_names).as_str(),
);

// Get the files from the DB
let files = {
let files_result = vector_fs.db.get_all_files_from_inbox(files_inbox.clone());
match files_result {
Ok(files) => files,
Err(e) => return Err(LLMProviderError::VectorFS(e)),
}
}
};

// Filter files based on the provided file names
let specified_files: Vec<(String, Vec<u8>)> = files
.into_iter()
.filter(|(name, _)| file_names.contains(name))
.collect();

// Process the specified files and update the job scope
Self::process_files_and_update_scope(
db,
vector_fs,
specified_files,
agent_found,
full_job,
profile,
save_to_vector_fs_folder,
generator,
)
.await?;
}

Ok(())
Expand Down Expand Up @@ -984,27 +1064,17 @@ impl JobManager {
/// Else, the files will be returned as LocalScopeEntries and thus held inside.
#[allow(clippy::too_many_arguments)]
pub async fn process_files_inbox(
db: Arc<ShinkaiDB>,
vector_fs: Arc<VectorFS>,
_db: Arc<ShinkaiDB>,
_vector_fs: Arc<VectorFS>,
agent: Option<SerializedLLMProvider>,
files_inbox: String,
files: Vec<(String, Vec<u8>)>,
_profile: ShinkaiName,
save_to_vector_fs_folder: Option<VRPath>,
generator: RemoteEmbeddingGenerator,
) -> Result<HashMap<String, ScopeEntry>, LLMProviderError> {
// Create the RemoteEmbeddingGenerator instance
let mut files_map: HashMap<String, ScopeEntry> = HashMap::new();

// Get the files from the DB
let files = {
let files_result = vector_fs.db.get_all_files_from_inbox(files_inbox.clone());
// Check if there was an error getting the files
match files_result {
Ok(files) => files,
Err(e) => return Err(LLMProviderError::VectorFS(e)),
}
};

// Filter out image files
// TODO: Eventually we will add extra embeddings that support images
let files: Vec<(String, Vec<u8>)> = files
Expand Down
Loading

0 comments on commit 066b3ab

Please sign in to comment.