From d2d075da898251bfb36afbc6caadf376e181c171 Mon Sep 17 00:00:00 2001 From: Jorge Antonio Date: Wed, 18 Dec 2024 09:09:17 +0000 Subject: [PATCH] refactor code --- atoma-bin/atoma_node.rs | 22 +++++++------ atoma-confidential/src/service.rs | 51 +++++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 11 deletions(-) diff --git a/atoma-bin/atoma_node.rs b/atoma-bin/atoma_node.rs index e19e5f91..b476eed5 100644 --- a/atoma-bin/atoma_node.rs +++ b/atoma-bin/atoma_node.rs @@ -227,17 +227,21 @@ async fn main() -> Result<()> { let (compute_shared_secret_sender, compute_shared_secret_receiver) = tokio::sync::mpsc::unbounded_channel(); - let confidential_compute_service = AtomaConfidentialComputeService::new( - client.clone(), - subscriber_confidential_compute_receiver, - app_state_decryption_receiver, - app_state_encryption_receiver, - compute_shared_secret_receiver, - shutdown_receiver.clone(), - )?; + let client_clone = client.clone(); + let shutdown_receiver_clone = shutdown_receiver.clone(); spawn_with_shutdown( - async move { confidential_compute_service.run().await }, + async move { + AtomaConfidentialComputeService::start_confidential_compute_service( + client_clone, + subscriber_confidential_compute_receiver, + app_state_decryption_receiver, + app_state_encryption_receiver, + compute_shared_secret_receiver, + shutdown_receiver_clone, + ) + .await + }, shutdown_sender.clone(), ); diff --git a/atoma-confidential/src/service.rs b/atoma-confidential/src/service.rs index 632971df..5bf63354 100644 --- a/atoma-confidential/src/service.rs +++ b/atoma-confidential/src/service.rs @@ -81,6 +81,54 @@ impl AtomaConfidentialComputeService { shutdown_signal, }) } + + /// Initializes and starts the confidential compute service. + /// + /// This method performs the following steps: + /// 1. Creates a new service instance + /// 2. Submits an initial node key rotation attestation + /// 3. Starts the main service event loop + /// + /// # Arguments + /// * `sui_client` - Arc-wrapped RwLock containing the Sui blockchain client + /// * `event_receiver` - Channel receiver for Atoma events + /// * `service_decryption_receiver` - Channel receiver for decryption requests + /// * `service_encryption_receiver` - Channel receiver for encryption requests + /// * `service_shared_secret_receiver` - Channel receiver for shared secret computation requests + /// * `shutdown_signal` - Watch channel receiver for coordinating service shutdown + /// + /// # Returns + /// * `Ok(())` if the service starts and runs successfully + /// * `Err(AtomaConfidentialComputeError)` if initialization, attestation, or running fails + /// + /// # Errors + /// This function can return: + /// * `AtomaConfidentialComputeError::KeyManagementError` if key initialization fails + /// * `AtomaConfidentialComputeError::SuiClientError` if attestation submission fails + #[instrument(level = "info", skip_all)] + pub async fn start_confidential_compute_service( + sui_client: Arc>, + event_receiver: UnboundedReceiver, + service_decryption_receiver: UnboundedReceiver, + service_encryption_receiver: UnboundedReceiver, + service_shared_secret_receiver: UnboundedReceiver, + shutdown_signal: tokio::sync::watch::Receiver, + ) -> Result<()> { + let mut service = Self::new( + sui_client, + event_receiver, + service_decryption_receiver, + service_encryption_receiver, + service_shared_secret_receiver, + shutdown_signal, + )?; + + // NOTE: Submit the first node key rotation attestation, because the node is starting up afresh + service.submit_node_key_rotation_tdx_attestation().await?; + service.run().await?; + + Ok(()) + } /// Returns the current public key used by the confidential compute service /// @@ -136,8 +184,7 @@ impl AtomaConfidentialComputeService { "Running confidential compute service, with dh public key: {:?}", self.key_manager.get_public_key().as_bytes() ); - // Submit the first node key rotation attestation, because the node is starting up afresh - self.submit_node_key_rotation_tdx_attestation().await?; + loop { tokio::select! { Some((decryption_request, sender)) = self.service_decryption_receiver.recv() => {