diff --git a/documentation/docs/frequently-asked-questions.md b/documentation/docs/frequently-asked-questions.md index ed2507c9..7a13fa9a 100644 --- a/documentation/docs/frequently-asked-questions.md +++ b/documentation/docs/frequently-asked-questions.md @@ -233,10 +233,10 @@ onPressed: () async { ``` ```rust title="native/hub/src/sample_functions.rs" -pub async fn respond() -> Result<()> { +pub async fn respond() { use messages::tutorial_resource::*; - let receiver = MyUniqueInput::get_dart_signal_receiver()?; + let receiver = MyUniqueInput::get_dart_signal_receiver(); while let Some(dart_signal) = receiver.recv().await { let my_unique_input = dart_signal.message; MyUniqueOutput { @@ -245,8 +245,6 @@ pub async fn respond() -> Result<()> { } .send_signal_to_dart(); } - - Ok(()) } ``` diff --git a/documentation/docs/messaging.md b/documentation/docs/messaging.md index 1f85e31f..7673383f 100644 --- a/documentation/docs/messaging.md +++ b/documentation/docs/messaging.md @@ -68,7 +68,7 @@ MyDataInput( ... ).sendSignalToRust(); ``` ```rust title="Rust" -let receiver = MyDataInput::get_dart_signal_receiver()?; +let receiver = MyDataInput::get_dart_signal_receiver(); while let Some(dart_signal) = receiver.recv().await { let message: MyDataInput = dart_signal.message; // Custom Rust logic here @@ -88,7 +88,7 @@ MyDataInput( ... ).sendSignalToRust(binary); ``` ```rust title="Rust" -let receiver = MyDataInput::get_dart_signal_receiver()?; +let receiver = MyDataInput::get_dart_signal_receiver(); while let Some(dart_signal) = receiver.recv().await { let message: MyDataInput = dart_signal.message; let binary: Vec = dart_signal.binary; diff --git a/documentation/docs/tutorial.md b/documentation/docs/tutorial.md index 68d871a8..8a5db25c 100644 --- a/documentation/docs/tutorial.md +++ b/documentation/docs/tutorial.md @@ -62,10 +62,10 @@ use crate::common::*; use crate::messages; use rinf::debug_print; -pub async fn calculate_precious_data() -> Result<()> { +pub async fn calculate_precious_data() { use messages::tutorial_messages::*; - let receiver = MyPreciousData::get_dart_signal_receiver()?; // GENERATED + let receiver = MyPreciousData::get_dart_signal_receiver(); // GENERATED while let Some(dart_signal) = receiver.recv().await { let my_precious_data = dart_signal.message; @@ -79,8 +79,6 @@ pub async fn calculate_precious_data() -> Result<()> { debug_print!("{new_numbers:?}"); debug_print!("{new_string}"); } - - Ok(()) } ``` @@ -217,18 +215,16 @@ children: [ use crate::common::*; use crate::messages; -pub async fn tell_treasure() -> Result<()> { +pub async fn tell_treasure() { use messages::tutorial_messages::*; let mut current_value: i32 = 1; - let receiver = MyTreasureInput::get_dart_signal_receiver()?; // GENERATED + let receiver = MyTreasureInput::get_dart_signal_receiver(); // GENERATED while let Some(_) = receiver.recv().await { MyTreasureOutput { current_value }.send_signal_to_dart(); // GENERATED current_value += 1; } - - Ok(()) } ``` diff --git a/flutter_package/bin/src/message.dart b/flutter_package/bin/src/message.dart index 7cfbe841..14fe3fe9 100644 --- a/flutter_package/bin/src/message.dart +++ b/flutter_package/bin/src/message.dart @@ -272,7 +272,7 @@ use rinf::{ DartSignal, MessageReceiver, MessageSender, RinfError, }; -use std::sync::Mutex; +use std::sync::LazyLock; ''', atFront: true, @@ -289,38 +289,16 @@ use std::sync::Mutex; await insertTextToFile( rustPath, ''' -type ${messageName}Cell = Mutex>, - Option>>, -)>>; + MessageReceiver>, +)>; pub static ${snakeName.toUpperCase()}_CHANNEL: ${messageName}Cell = - Mutex::new(None); + LazyLock::new(message_channel); impl ${normalizePascal(messageName)} { - pub fn get_dart_signal_receiver() - -> Result>, RinfError> - { - let mut guard = ${snakeName.toUpperCase()}_CHANNEL - .lock() - .map_err(|_| RinfError::LockMessageChannel)?; - if guard.is_none() { - let (sender, receiver) = message_channel(); - guard.replace((sender, Some(receiver))); - } - let (mut sender, mut receiver_option) = guard - .take() - .ok_or(RinfError::NoMessageChannel)?; - // After Dart's hot restart or app reopen on mobile devices, - // a sender from the previous run already exists - // which is now closed. - if sender.is_closed() { - let receiver; - (sender, receiver) = message_channel(); - receiver_option = Some(receiver); - } - let receiver = receiver_option.ok_or(RinfError::MessageReceiverTaken)?; - guard.replace((sender, None)); - Ok(receiver) + pub fn get_dart_signal_receiver() -> MessageReceiver> { + ${snakeName.toUpperCase()}_CHANNEL.1.clone() } } ''', @@ -467,28 +445,7 @@ new_hash_map.insert( message, binary: binary.to_vec(), }; - let mut guard = ${snakeName.toUpperCase()}_CHANNEL - .lock() - .map_err(|_| RinfError::LockMessageChannel)?; - if guard.is_none() { - let (sender, receiver) = message_channel(); - guard.replace((sender, Some(receiver))); - } - let mut pair = guard - .as_ref() - .ok_or(RinfError::NoMessageChannel)?; - // After Dart's hot restart or app reopen on mobile devices, - // a sender from the previous run already exists - // which is now closed. - if pair.0.is_closed() { - let (sender, receiver) = message_channel(); - guard.replace((sender, Some(receiver))); - pair = guard - .as_ref() - .ok_or(RinfError::NoMessageChannel)?; - } - let sender = &pair.0; - let _ = sender.send(dart_signal); + ${snakeName.toUpperCase()}_CHANNEL.0.send(dart_signal); Ok(()) }), ); diff --git a/flutter_package/example/native/hub/src/sample_functions.rs b/flutter_package/example/native/hub/src/sample_functions.rs index ac304caa..202d2cb3 100644 --- a/flutter_package/example/native/hub/src/sample_functions.rs +++ b/flutter_package/example/native/hub/src/sample_functions.rs @@ -13,13 +13,13 @@ static IS_DEBUG_MODE: bool = true; static IS_DEBUG_MODE: bool = false; // Business logic for the counter widget. -pub async fn tell_numbers() -> Result<()> { +pub async fn tell_numbers() { use messages::counter_number::*; let mut vector = Vec::new(); // Stream getter is generated from a marked Protobuf message. - let receiver = SampleNumberInput::get_dart_signal_receiver()?; + let receiver = SampleNumberInput::get_dart_signal_receiver(); while let Some(dart_signal) = receiver.recv().await { // Extract values from the message received from Dart. // This message is a type that's declared in its Protobuf file. @@ -40,8 +40,6 @@ pub async fn tell_numbers() -> Result<()> { } .send_signal_to_dart(); } - - Ok(()) } // Business logic for the fractal image. @@ -104,15 +102,14 @@ pub async fn stream_fractal() { // A dummy function that uses sample messages to eliminate warnings. #[allow(dead_code)] -async fn use_messages() -> Result<()> { +async fn use_messages() { use messages::sample_folder::sample_file::*; - let _ = SampleInput::get_dart_signal_receiver()?; + let _ = SampleInput::get_dart_signal_receiver(); SampleOutput { kind: 3, oneof_input: Some(sample_output::OneofInput::Age(25)), } .send_signal_to_dart(); - Ok(()) } // Business logic for testing various crates. diff --git a/flutter_package/template/native/hub/src/lib.rs b/flutter_package/template/native/hub/src/lib.rs index 69e56e97..f5ea7dbf 100644 --- a/flutter_package/template/native/hub/src/lib.rs +++ b/flutter_package/template/native/hub/src/lib.rs @@ -28,18 +28,16 @@ async fn main() -> Result<()> { Ok(()) } -async fn communicate() -> Result<()> { +async fn communicate() { use messages::basic::*; // Send signals to Dart like below. SmallNumber { number: 7 }.send_signal_to_dart(); // Get receivers that listen to Dart signals like below. - let receiver = SmallText::get_dart_signal_receiver()?; + let receiver = SmallText::get_dart_signal_receiver(); while let Some(dart_signal) = receiver.recv().await { let message: SmallText = dart_signal.message; rinf::debug_print!("{message:?}"); } - - Ok(()) } diff --git a/rust_crate/src/channel.rs b/rust_crate/src/channel.rs index 6abc3301..7dada693 100644 --- a/rust_crate/src/channel.rs +++ b/rust_crate/src/channel.rs @@ -1,95 +1,66 @@ -use crate::error::RinfError; use std::collections::VecDeque; use std::future::Future; use std::pin::Pin; use std::sync::{Arc, Mutex}; use std::task::{Context, Poll, Waker}; +#[derive(Clone)] pub struct MessageSender { inner: Arc>>, } pub struct MessageReceiver { inner: Arc>>, + id: usize, // Each receiver has a unique ID } struct MessageChannel { - queue: VecDeque, // Message queue for storing multiple messages + queue: VecDeque, waker: Option, - sender_dropped: bool, // Track whether the sender has been dropped - receiver_dropped: bool, // Track whether the receiver has been dropped + active_receiver_id: usize, // Track the active receiver by ID } impl MessageSender { - // Send a message and store it in the queue - pub fn send(&self, msg: T) -> Result<(), RinfError> { - let mut inner = self - .inner - .lock() - .map_err(|_| RinfError::BrokenMessageChannel)?; - - // Return an error if the receiver has been dropped - if inner.receiver_dropped { - return Err(RinfError::ClosedMessageChannel); - } + pub fn send(&self, msg: T) { + let mut inner = match self.inner.lock() { + Ok(inner) => inner, + Err(_) => return, // Do not consider poisoned mutex + }; // Enqueue the message inner.queue.push_back(msg); if let Some(waker) = inner.waker.take() { - waker.wake(); // Wake the receiver if it's waiting - } - Ok(()) - } - - // Check if the receiver is still alive - pub fn is_closed(&self) -> bool { - let inner = self.inner.lock(); - match inner { - Ok(inner) => inner.receiver_dropped, - Err(_) => true, // If the lock is poisoned, consider it closed - } - } -} - -impl Drop for MessageSender { - fn drop(&mut self) { - let inner = self.inner.lock(); - if let Ok(mut inner) = inner { - // Mark that the sender has been dropped - inner.sender_dropped = true; - if let Some(waker) = inner.waker.take() { - waker.wake(); // Wake the receiver in case it's waiting - } + waker.wake(); } } } impl MessageReceiver { - // Receive the next message from the queue asynchronously pub async fn recv(&self) -> Option { RecvFuture { inner: self.inner.clone(), + receiver_id: self.id, // Pass the receiver's ID to the future } .await } } -impl Drop for MessageReceiver { - fn drop(&mut self) { - let inner = self.inner.lock(); - if let Ok(mut inner) = inner { - // Mark that the receiver has been dropped - inner.receiver_dropped = true; - if let Some(waker) = inner.waker.take() { - waker.wake(); // Wake any waiting sender - } - } +// Automatically make the cloned receiver the active one +impl Clone for MessageReceiver { + fn clone(&self) -> Self { + let mut inner = self.inner.lock().unwrap(); + let new_receiver = MessageReceiver { + inner: self.inner.clone(), + id: inner.active_receiver_id + 1, // Increment ID for new receiver + }; + inner.active_receiver_id = new_receiver.id; // Update active receiver + new_receiver } } -// Future implementation for receiving a message struct RecvFuture { inner: Arc>>, + receiver_id: usize, // Track which receiver is polling } impl Future for RecvFuture { @@ -101,18 +72,16 @@ impl Future for RecvFuture { Err(_) => return Poll::Ready(None), // Return None on poisoned mutex }; - // Check if there are any messages in the queue - if let Some(msg) = inner.queue.pop_front() { - return Poll::Ready(Some(msg)); // Return the next message - } - - // If no messages and the sender is dropped, return None - if inner.sender_dropped && inner.queue.is_empty() { - Poll::Ready(None) + // Only allow the current active receiver to receive messages + if inner.active_receiver_id == self.receiver_id { + if let Some(msg) = inner.queue.pop_front() { + Poll::Ready(Some(msg)) + } else { + inner.waker = Some(cx.waker().clone()); + Poll::Pending + } } else { - // Set the waker for later notification - inner.waker = Some(cx.waker().clone()); - Poll::Pending // No message available, wait + Poll::Ready(None) // Return None if this receiver is not the current active one } } } @@ -120,15 +89,20 @@ impl Future for RecvFuture { // Create the message channel with a message queue pub fn message_channel() -> (MessageSender, MessageReceiver) { let channel = Arc::new(Mutex::new(MessageChannel { - queue: VecDeque::new(), // Initialize an empty message queue + queue: VecDeque::new(), waker: None, - sender_dropped: false, // Initially, the sender is not dropped - receiver_dropped: false, // Initially, the receiver is not dropped + active_receiver_id: 0, // Start with receiver ID 0 })); + + let receiver = MessageReceiver { + inner: channel.clone(), + id: 0, + }; + ( MessageSender { inner: channel.clone(), }, - MessageReceiver { inner: channel }, + receiver, ) } diff --git a/rust_crate/src/error.rs b/rust_crate/src/error.rs index 0bb19662..ea528f57 100644 --- a/rust_crate/src/error.rs +++ b/rust_crate/src/error.rs @@ -7,11 +7,6 @@ pub enum RinfError { NoDartIsolate, LockShutdownReceiver, NoShutdownReceiver, - LockMessageChannel, - BrokenMessageChannel, - ClosedMessageChannel, - NoMessageChannel, - MessageReceiverTaken, DecodeMessage, NoSignalHandler, } @@ -31,21 +26,6 @@ impl fmt::Display for RinfError { RinfError::NoShutdownReceiver => { write!(f, "Shutdown receiver was not created.") } - RinfError::LockMessageChannel => { - write!(f, "Could not acquire the message channel lock.") - } - RinfError::BrokenMessageChannel => { - write!(f, "Message channel is broken.",) - } - RinfError::ClosedMessageChannel => { - write!(f, "Message channel is closed.",) - } - RinfError::NoMessageChannel => { - write!(f, "Message channel was not created.",) - } - RinfError::MessageReceiverTaken => { - write!(f, "Each Dart signal receiver can be taken only once.") - } RinfError::DecodeMessage => { write!(f, "Could not decode the message.") }