Skip to content

Commit

Permalink
Fix sending SyncMessage to self
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Nov 19, 2023
1 parent 88d9680 commit 0ee1445
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 21 deletions.
16 changes: 16 additions & 0 deletions libsignal-service/src/proto.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![allow(clippy::all)]

use rand::{Rng, RngCore};
include!(concat!(env!("OUT_DIR"), "/signalservice.rs"));
include!(concat!(env!("OUT_DIR"), "/signal.rs"));

Expand Down Expand Up @@ -65,3 +67,17 @@ impl WebSocketResponseMessage {
}
}
}

impl SyncMessage {
pub fn with_padding() -> Self {
let mut rng = rand::thread_rng();
let random_size = rng.gen_range(1..=512);
let mut padding: Vec<u8> = vec![0; random_size];
rng.fill_bytes(&mut padding);

Self {
padding: Some(padding),
..Default::default()
}
}
}
32 changes: 15 additions & 17 deletions libsignal-service/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,34 +290,38 @@ where
pub async fn send_message(
&mut self,
recipient: &ServiceAddress,
unidentified_access: Option<UnidentifiedAccess>,
mut unidentified_access: Option<UnidentifiedAccess>,
message: impl Into<ContentBody>,
timestamp: u64,
online: bool,
) -> SendMessageResult {
let content_body = message.into();

use crate::proto::data_message::Flags;

let end_session = match &content_body {
ContentBody::DataMessage(message) => {
unidentified_access.take(); // don't send end session as sealed sender
message.flags == Some(Flags::EndSession as u32)
},
_ => false,
};

// we never send sync messages or to our own account as sealed sender
if recipient == &self.local_address
|| matches!(&content_body, ContentBody::SynchronizeMessage(_))
{
unidentified_access.take();
}

// try to send the original message to all the recipient's devices
let mut results = vec![
self.try_send_message(
*recipient,
if !end_session {
unidentified_access.as_ref()
} else {
None
},
unidentified_access.as_ref(),
&content_body,
timestamp,
online,
false,
)
.await,
];
Expand All @@ -342,7 +346,6 @@ where
&sync_message,
timestamp,
false,
true,
)
.await?;
},
Expand Down Expand Up @@ -383,7 +386,6 @@ where
&content_body,
timestamp,
online,
false,
)
.await;

Expand Down Expand Up @@ -420,7 +422,6 @@ where
&sync_message,
timestamp,
false,
true,
)
.await;

Expand All @@ -439,7 +440,6 @@ where
content_body: &ContentBody,
timestamp: u64,
online: bool,
is_sync_message: bool,
) -> SendMessageResult {
use prost::Message;

Expand All @@ -453,7 +453,6 @@ where
&recipient,
unidentified_access.map(|x| &x.certificate),
&content_bytes,
is_sync_message,
)
.await?;

Expand Down Expand Up @@ -596,7 +595,7 @@ where
blob: Some(ptr),
complete: Some(complete),
}),
..Default::default()
..SyncMessage::with_padding()
};

self.send_message(
Expand All @@ -617,7 +616,6 @@ where
recipient: &ServiceAddress,
unidentified_access: Option<&SenderCertificate>,
content: &[u8],
is_sync_message: bool,
) -> Result<Vec<OutgoingPushMessage>, MessageSenderError> {
let mut messages = vec![];

Expand All @@ -632,8 +630,8 @@ where
// always send to the primary device no matter what
devices.insert(DEFAULT_DEVICE_ID.into());

// when sending a sync message, we always send it to all devices except the current one
if is_sync_message {
// when sending an identified message, remove ourselves from the list of recipients
if unidentified_access.is_none() {
devices.remove(&self.device_id);
}

Expand Down Expand Up @@ -786,7 +784,7 @@ where
unidentified_status,
..Default::default()
}),
..Default::default()
..SyncMessage::with_padding()
})
}
}
9 changes: 5 additions & 4 deletions libsignal-service/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
frame: Bytes,
) -> Result<(), ServiceError> {
let msg = WebSocketMessage::decode(frame)?;
log::trace!("Decoded {:?}", msg);
log::trace!("decoded {:?}", msg);

use web_socket_message::Type;
match (msg.r#type(), msg.request, msg.response) {
Expand Down Expand Up @@ -184,7 +184,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
.filter(|x| !self.outgoing_request_map.contains_key(x))
.unwrap_or_else(|| self.next_request_id()),
);
log::trace!("Sending request {:?}", request);
log::trace!("sending request {:?}", request);

self.outgoing_request_map.insert(request.id.unwrap(), responder);
let msg = WebSocketMessage {
Expand Down Expand Up @@ -227,7 +227,8 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
self.ws.send_message(buffer.into()).await?
}
Some(WebSocketStreamItem::KeepAliveRequest) => {
log::debug!("keep alive is disabled: ignoring request");
log::trace!("keep-alive is disabled");
continue;
}
None => {
return Err(ServiceError::WsError {
Expand All @@ -239,7 +240,7 @@ impl<WS: WebSocketService> SignalWebSocketProcess<WS> {
response = self.outgoing_responses.next() => {
match response {
Some(Ok(response)) => {
log::trace!("Sending response {:?}", response);
log::trace!("sending response {:?}", response);

let msg = WebSocketMessage {
r#type: Some(web_socket_message::Type::Response.into()),
Expand Down

0 comments on commit 0ee1445

Please sign in to comment.