diff --git a/crates/core/src/dvm/customer.rs b/crates/core/src/dvm/customer.rs index 632a505..c7a1cc6 100644 --- a/crates/core/src/dvm/customer.rs +++ b/crates/core/src/dvm/customer.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; use std::time::Duration; use log::{debug, error, info}; use nostr_sdk::prelude::*; use thiserror::Error; +use tokio::sync::Mutex; use tokio::time::timeout; use crate::config::Settings; @@ -138,11 +140,15 @@ impl Customer { let job_id = job_id.to_string(); let subscription_id = subscription_id.clone(); + let result = Arc::new(Mutex::new(None)); + let result_clone = Arc::clone(&result); + // Handle incoming Nostr notifications self.nostr_client - .handle_notifications(|notification| { + .handle_notifications(move |notification| { let job_id = job_id.clone(); let subscription_id = subscription_id.clone(); + let result = Arc::clone(&result_clone); async move { if let RelayPoolNotification::Event { subscription_id: sub_id, @@ -151,10 +157,12 @@ impl Customer { } = notification { if sub_id == subscription_id { - if let Ok(result) = + if let Ok(job_result) = serde_json::from_str::(&event.content) { - if result.job_id == job_id { + if job_result.job_id == job_id { + let mut result_guard = result.lock().await; + *result_guard = Some(event.content.clone()); return Ok(true); } } @@ -166,28 +174,14 @@ impl Customer { .await .map_err(CustomerError::NostrClientError)?; - let filter = Filter::new() - .kind(Kind::Custom(JOB_RESULT_KIND)) - .author(PublicKey::from_bech32(&self.settings.prover_agent_pk).unwrap()) - .since(Timestamp::now() - Duration::from_secs(60)); - - // Fetch recent events to find the job result - let events = self - .nostr_client - .get_events_of(vec![filter], None) - .await - .map_err(CustomerError::NostrClientError)?; - - // Find and return the matching job result - for event in events { - if let Ok(job_result) = serde_json::from_str::(&event.content) { - if job_result.job_id == job_id { - return Ok(job_result); - } - } + // Check if we found a result + let result_guard = result.lock().await; + if let Some(job_result) = result_guard.clone() { + // Convert the string to a GenerateZKPJobResult + Ok(serde_json::from_str(&job_result).unwrap()) + } else { + Err(CustomerError::Unknown("Job result not found".to_string())) } - - Err(CustomerError::Unknown("Job result not found".to_string())) } /// Verifies the proof in a job result diff --git a/scripts/utils/concatenate_files.py b/scripts/utils/concatenate_files.py new file mode 100644 index 0000000..0438797 --- /dev/null +++ b/scripts/utils/concatenate_files.py @@ -0,0 +1,42 @@ +import os +import shutil +import argparse + +def concatenate_files(root_dir, extension, output_file): + """ + Concatenates all files with a given extension in a directory and its subdirectories into a single file. + Includes the full path of each file before appending its content, prefixed with '//' as a comment. + + Args: + root_dir (str): The root directory to search for files. + extension (str): The file extension to look for. + output_file (str): The path to the output file where the content will be concatenated. + """ + with open(output_file, 'wb') as outfile: + # Walk through all directories and files in the root directory + for dirpath, dirnames, filenames in os.walk(root_dir): + for filename in filenames: + # Check if the file ends with the given extension + if filename.endswith(extension): + file_path = os.path.join(dirpath, filename) + # Write the full file path as a comment before the content + outfile.write(f"// {file_path}\n".encode()) + # Open each file in binary mode and append its content to the output file + with open(file_path, 'rb') as infile: + shutil.copyfileobj(infile, outfile) + # Ensure there is a newline after each file's content (optional, for readability) + outfile.write(b'\n') + print(f"Appended {file_path} to {output_file}") + +def main(): + parser = argparse.ArgumentParser(description='Concatenate files with a specific extension from a directory into a single file.') + parser.add_argument('root_dir', type=str, help='The root directory to search for files') + parser.add_argument('extension', type=str, help='The file extension to look for') + parser.add_argument('output_file', type=str, help='The file where the content will be concatenated') + + args = parser.parse_args() + + concatenate_files(args.root_dir, args.extension, args.output_file) + +if __name__ == '__main__': + main()