Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on_track not called even after answer contains a video m line #605

Closed
Advait1306 opened this issue Aug 23, 2024 · 1 comment
Closed

on_track not called even after answer contains a video m line #605

Advait1306 opened this issue Aug 23, 2024 · 1 comment

Comments

@Advait1306
Copy link

Advait1306 commented Aug 23, 2024

Not sure what I'm doing wrong here.

  • The offer contains details about the track - I can see the m line with the id & stream id.
  • The answer contains a video m line (it contains two video m lines, I know it contains the monitor m line because if I don't add the track it shows only 1 m line).

Data channels are working & upon creating new data channels the on_data_channel is being fired.

I have tried everything at this point, could this be a package issue, or am I running into some firewall problems?

#[derive(Debug, Clone, Copy)]
pub enum PeerMode {
    Rover,
    Dex,
}

pub struct PeerConnector {
    peer_connection: Arc<RTCPeerConnection>,
    tx: Sender<SignalMessage>,
    rx: Receiver<SignalMessage>,
    mode: PeerMode,
    default_data_channel: Option<Arc<RTCDataChannel>>,
    stun_buffer: Vec<RTCIceCandidateInit>,
}

impl PeerConnector {
    pub async fn new(
        mode: PeerMode,
        rx: Receiver<SignalMessage>,
        tx: Sender<SignalMessage>,
    ) -> Self {
        let mut config = RTCConfiguration::default();

        config.ice_servers.append(&mut vec![RTCIceServer {
            urls: vec!["stun:stun.l.google.com:19302".to_string()],
            username: "".to_string(),
            credential: "".to_string(),
            credential_type: Default::default(),
        }]);

        // Create a MediaEngine object to configure the supported codec
        let mut m = MediaEngine::default();

        m.register_default_codecs().unwrap();

        // Create a InterceptorRegistry. This is the user configurable RTP/RTCP Pipeline.
        // This provides NACKs, RTCP Reports and other features. If you use `webrtc.NewPeerConnection`
        // this is enabled by default. If you are manually managing You MUST create a InterceptorRegistry
        // for each PeerConnection.
        let mut registry = Registry::new();

        // Use the default set of Interceptors
        registry = register_default_interceptors(registry, &mut m).unwrap();

        // Create the API object with the MediaEngine
        let api = APIBuilder::new()
            .with_media_engine(m)
            .with_interceptor_registry(registry)
            .build();

        // Create API for peer connection
        let peer_connection = Arc::new(api.new_peer_connection(config).await.unwrap());

        Self {
            peer_connection,
            rx,
            tx,
            mode,
            default_data_channel: None,
            stun_buffer: vec![],
        }
    }

    pub async fn connect(&mut self) -> (Arc<RTCPeerConnection>, Arc<RTCDataChannel>) {
        let tx = self.tx.clone();
        self.peer_connection.on_ice_candidate(Box::new(
            move |candidate: Option<RTCIceCandidate>| {
                let tx = tx.clone();
                Box::pin(async move {
                    if let Some(candidate) = candidate {
                        let string_candidate =
                            serde_json::to_string(&candidate.to_json().unwrap()).unwrap();
                        tx.send(SignalMessage::Stun {
                            url: string_candidate,
                        })
                        .await
                        .unwrap();
                    }
                })
            },
        ));

        // Dex: default_data_channel is received inside this callback. We use a channel to pass it to the main thread
        let (tx, mut rx) = tokio::sync::mpsc::channel::<Arc<RTCDataChannel>>(1);

        println!(
            "before addition len: {:?}",
            self.peer_connection.get_transceivers().await.len()
        );

        self.peer_connection
            .add_transceiver_from_kind(RTPCodecType::Video, None)
            .await
            .unwrap();

        self.peer_connection
            .add_transceiver_from_kind(RTPCodecType::Audio, None)
            .await
            .unwrap();

        println!(
            "after addition len: {:?}",
            self.peer_connection.get_transceivers().await.len()
        );

        self.peer_connection
            .on_data_channel(Box::new(move |channel| {
                let tx = tx.clone();
                Box::pin(async move {
                    tx.send(channel.clone()).await.unwrap();
                })
            }));

        self.peer_connection
            .on_track(Box::new(|track, _receiver, _| {
                Box::pin(async move {
                    println!("Track received: {:?}", track);
                })
            }));

        self.peer_connection.on_negotiation_needed(Box::new(|| {
            Box::pin(async {
                println!("Negotiation needed");
            })
        }));

        match self.mode {
            PeerMode::Rover => {
                self.create_offer().await;
            }
            PeerMode::Dex => {}
        }

        loop {
            tokio::select! {
                data = self.rx.recv() => {
                    if let Some(msg) = data {
                        match msg {
                            SignalMessage::Stun { url } => {
                                self.received_stun(url).await;
                            }
                            SignalMessage::Description { value } => match self.mode {
                                PeerMode::Rover => {
                                    self.received_answer(value).await;
                                }
                                PeerMode::Dex => {
                                    self.create_answer(value).await;
                                }
                            },
                            _ => {}
                        }
                    }
                }
                channel = rx.recv() => {
                    if let Some(channel) = channel {
                        self.default_data_channel = Some(channel);
                    }
                }
                _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
                    let state = self.peer_connection.connection_state();
                    println!("{:?}: Connection State: {:?}", self.mode, state);

                    match state {
                        RTCPeerConnectionState::Connected => {
                            match self.default_data_channel {
                                Some(_) => {
                                    println!("{:?}: Connected", self.mode);

                                    let peer_connection = self.peer_connection.clone();
                                    let default_data_channel = self.default_data_channel.clone().unwrap();

                                    // return (peer_connection, default_data_channel);
                                }
                                None => continue,
                            }
                        }
                        _ => {}
                    }
                }
            }
        }
    }

    async fn create_offer(&mut self) {
        // Create a new DataChannelInit object with your desired configuration
        let data_channel_init = RTCDataChannelInit {
            ordered: Some(true),   // Indicates whether data is delivered in order
            max_retransmits: None, // Maximum number of retransmissions
            ..Default::default()
        };

        // Create the DataChannel
        let data_channel = self
            .peer_connection
            .create_data_channel("default", Some(data_channel_init))
            .await
            .unwrap();

        println!(
            "before track len: {:?}",
            self.peer_connection.get_transceivers().await.len()
        );

        let track = Arc::new(TrackLocalStaticRTP::new(
            RTCRtpCodecCapability {
                mime_type: MIME_TYPE_H264.to_owned(),
                ..Default::default()
            },
            "monitor".to_string(),
            "monitor-stream".to_string(),
        ));

        let track2 = track.clone();
        tokio::spawn(async move {
            // Write to track
            let mut frame = vec![0u8; 1500];
            loop {
                track2
                    .write_rtp_with_extensions(
                        &rtp::packet::Packet {
                            header: Default::default(),
                            payload: frame.clone().into(),
                        },
                        &[],
                    )
                    .await
                    .unwrap();
            }
        });

        let sender = self.peer_connection.add_track(track).await.unwrap();

        tokio::spawn(async move {
            let mut rtcp_buf = vec![0u8; 1500];
            while let Ok((_, _)) = sender.read(&mut rtcp_buf).await {}
        });

        println!(
            "after track len: {:?}",
            self.peer_connection.get_transceivers().await.len()
        );

        self.default_data_channel = Some(data_channel.clone());

        println!("{:?}: Creating offer", self.mode);
        let offer = self.peer_connection.create_offer(None).await.unwrap();

        let mut gather_complete = self.peer_connection.gathering_complete_promise().await;

        self.peer_connection
            .set_local_description(offer)
            .await
            .unwrap();

        let _ = gather_complete.recv().await;

        let string_offer =
            serde_json::to_string(&(self.peer_connection.local_description().await.unwrap()));

        println!("{:?}: Sending offer", self.mode);
        self.tx
            .send(SignalMessage::Description {
                value: string_offer.unwrap(),
            })
            .await
            .unwrap();
    }

    async fn create_answer(&mut self, offer: String) {
        println!("{:?}: Received offer, creating answer", self.mode);

        let offer: RTCSessionDescription = serde_json::from_str(&offer).unwrap();
        println!("{:?}", offer.sdp);
        self.peer_connection
            .set_remote_description(offer)
            .await
            .unwrap();

        for candidate in self.stun_buffer.drain(..) {
            self.peer_connection
                .add_ice_candidate(candidate)
                .await
                .unwrap();
        }

        let answer = self.peer_connection.create_answer(None).await.unwrap();

        let clone_answer = answer.clone();

        self.peer_connection
            .set_local_description(clone_answer)
            .await
            .unwrap();

        let string_answer = serde_json::to_string(&(answer.clone())).unwrap();

        self.tx
            .send(SignalMessage::Description {
                value: string_answer,
            })
            .await
            .unwrap();
    }

    // Should only be called in dex mode
    async fn received_answer(&mut self, answer: String) {
        println!("{:?}: Received answer", self.mode);
        let answer: RTCSessionDescription = serde_json::from_str(&answer).unwrap();
        println!("{:?}", answer.sdp);
        self.peer_connection
            .set_remote_description(answer)
            .await
            .unwrap();
    }

    async fn received_stun(&mut self, stun: String) {
        println!("{:?}: Received stun", self.mode);
        let candidate_init: RTCIceCandidateInit = serde_json::from_str(&stun).unwrap();

        if self.peer_connection.remote_description().await.is_none() {
            self.stun_buffer.push(candidate_init);
        } else {
            self.peer_connection
            .add_ice_candidate(candidate_init)
            .await
            .unwrap();
        }

    }
}

EDIT:

@Advait1306
Copy link
Author

Dug a little deeper into the WebRTC-rs code & found out that the on_track is only called when a few packets are peeked into. Turns out the way I was sending packets wasn't correct. Changed that & it started working.

tokio::spawn(async move {
    // Write to track
    let mut sequence_number = 0u16;
    loop {
        let packet = Packet {
            header: Header {
                version: 2,
                sequence_number,
                payload_type: 96,
                ..Default::default()
            },
            payload: vec![0u8; 2].into(), // Dummy payload of 1500 bytes
        };

        // Increment the sequence number and timestamp for the next packet
        sequence_number = sequence_number.wrapping_add(1);

        println!("Sending packet: {:?}", packet);

        // Write the RTP packet to the track
        track2
            .write_rtp_with_extensions(&packet, &[])
            .await
            .unwrap();
    }
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant