Skip to content

Commit

Permalink
add init webrtc code to rtc
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Mar 18, 2024
1 parent b040816 commit 62741ba
Show file tree
Hide file tree
Showing 89 changed files with 24,245 additions and 15 deletions.
382 changes: 367 additions & 15 deletions rtc-shared/src/error.rs

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions rtc/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# rtc changelog

## Unreleased
44 changes: 44 additions & 0 deletions rtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,51 @@ authors = ["Rain Liu <[email protected]>"]
edition = "2021"
description = "RTC in Rust"
license = "MIT/Apache-2.0"
documentation = "https://docs.rs/rtc"
homepage = "https://webrtc.rs"
repository = "https://github.com/webrtc-rs/rtc"
readme = "../README.md"

[dependencies]
datachannel = { version = "0.2.0", path = "../rtc-datachannel", package = "rtc-datachannel" }
dtls = { version = "0.2.0", path = "../rtc-dtls", package = "rtc-dtls" }
ice = { version = "0.2.0", path = "../rtc-ice", package = "rtc-ice" }
#TODO: interceptor = { version = "0.0.0", path = "../rtc-interceptor", package = "rtc-interceptor" }
#TODO: mdns = { version = "0.0.0", path = "../mdns", package = "rtc-mdns" }
#TODO: media = { version = "0.0.0", path = "../media", package = "rtc-media" }
rtcp = { version = "0.2.0", path = "../rtc-rtcp", package = "rtc-rtcp" }
rtp = { version = "0.2.0", path = "../rtc-rtp", package = "rtc-rtp" }
sctp = { version = "0.2.0", path = "../rtc-sctp", package = "rtc-sctp" }
sdp = { version = "0.2.0", path = "../rtc-sdp", package = "rtc-sdp" }
shared = { version = "0.2.1", path = "../rtc-shared", package = "rtc-shared", default-features = false, features = ["crypto", "marshal", "replay"] }
srtp = { version = "0.2.0", path = "../rtc-srtp", package = "rtc-srtp" }
stun = { version = "0.2.0", path = "../rtc-stun", package = "rtc-stun" }
turn = { version = "0.1.0", path = "../rtc-turn", package = "rtc-turn" }

log = "0.4.21"
serde = { version = "1.0.197", features = ["derive"] }
serde_json = "1.0.114"
rand = "0.8.5"
bytes = "1.5.0"
regex = "1.10.3"
url = "2.5.0"
rustls = { version = "0.21", features = ["dangerous_configuration"] }
rcgen = { version = "0.11", features = ["pem", "x509-parser"] }
ring = "0.17.8"
sha2 = "0.10.8"
hex = "0.4.3"
pem = { version = "3.0.3", optional = true }
time = "0.3.34"
cfg-if = "1.0.0"
lazy_static = "1.4.0"
smol_str = { version = "0.2.1", features = ["serde"] }

[dev-dependencies]
tokio-test = "0.4.4"
env_logger = "0.10.2"
arc-swap = "1.7.0"
tokio = { version = "1.36.0", features = ["full"] }
waitgroup = "0.1.2"

[features]
pem = ["dep:pem", "dtls/pem"]
25 changes: 25 additions & 0 deletions rtc/src/api/api_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use super::*;

#[test]
fn test_new_api() -> Result<()> {
let mut s = SettingEngine::default();
s.detach_data_channels();
let mut m = MediaEngine::default();
m.register_default_codecs()?;

let api = APIBuilder::new()
.with_setting_engine(s)
.with_media_engine(m)
.build();

assert!(
api.setting_engine.detach.data_channels,
"Failed to set settings engine"
);
assert!(
!api.media_engine.audio_codecs.is_empty(),
"Failed to set media engine"
);

Ok(())
}
278 changes: 278 additions & 0 deletions rtc/src/api/interceptor_registry/interceptor_registry_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*TODO:
use super::*;
use crate::api::APIBuilder;
use crate::peer_connection::configuration::RTCConfiguration;
use bytes::Bytes;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use interceptor::mock::mock_builder::MockBuilder;
use interceptor::mock::mock_interceptor::MockInterceptor;
use interceptor::stream_info::StreamInfo;
use interceptor::{Attributes, Interceptor, RTPWriter, RTPWriterFn};
// E2E test of the features of Interceptors
// * Assert an extension can be set on an outbound packet
// * Assert an extension can be read on an outbound packet
// * Assert that attributes set by an interceptor are returned to the Reader
#[tokio::test]
async fn test_peer_connection_interceptor() -> Result<()> {
let create_pc = || async {
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut ir = Registry::new();
let BindLocalStreamFn = |info: &StreamInfo,
writer: Arc<dyn RTPWriter + Send + Sync>|
-> Pin<
Box<dyn Future<Output = Arc<dyn RTPWriter + Send + Sync>> + Send + Sync>,
> {
let writer2 = Arc::clone(&writer);
Box::pin(async move {
Arc::new(RTPWriterFn(Box::new(
move |in_pkt: &rtp::packet::Packet,
attributes: &Attributes|
-> Pin<
Box<
dyn Future<Output = std::result::Result<usize, interceptor::Error>>
+ Send
+ Sync,
>,
> {
let writer3 = Arc::clone(&writer2);
let a = attributes.clone();
// set extension on outgoing packet
let mut out_pkt = in_pkt.clone();
out_pkt.header.extension = true;
out_pkt.header.extension_profile = 0xBEDE;
Box::pin(async move {
out_pkt
.header
.set_extension(2, Bytes::from_static(b"foo"))?;
//writer3.write(&out_pkt, &a).await
Ok(0)
})
},
))) as Arc<dyn RTPWriter + Send + Sync>
})
};
BindRemoteStreamFn: func(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
if a == nil {
a = interceptor.Attributes{}
}
a.Set("attribute", "value")
return reader.Read(b, a)
})
},
let mock_builder = Box::new(MockBuilder {
build:
Box::new(
|_: &str| -> std::result::Result<
Arc<dyn Interceptor + Send + Sync>,
interceptor::Error,
> {
Ok(Arc::new(MockInterceptor {
..Default::default()
}))
},
),
});
let mock_builder = MockBuilder::new(
|_: &str| -> std::result::Result<
Arc<dyn Interceptor + Send + Sync>,
interceptor::Error,
> {
Ok(Arc::new(MockInterceptor {
..Default::default()
}))
},
);
ir.add(Box::new(mock_builder));
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(ir)
.build();
api.new_peer_connection(RTCConfiguration::default()).await
};
let offerer = create_pc().await?;
let answerer = create_pc().await?;
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
_, err = offerer.AddTrack(track)
assert.NoError(t, err)
seenRTP, seenRTPCancel := context.WithCancel(context.Background())
answerer.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
p, attributes, readErr := track.ReadRTP()
assert.NoError(t, readErr)
assert.Equal(t, p.Extension, true)
assert.Equal(t, "foo", string(p.GetExtension(2)))
assert.Equal(t, "value", attributes.Get("attribute"))
seenRTPCancel()
})
assert.NoError(t, signalPair(offerer, answerer))
func() {
ticker := time.NewTicker(time.Millisecond * 20)
for {
select {
case <-seenRTP.Done():
return
case <-ticker.C:
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0x00}, Duration: time.Second}))
}
}
}()
closePairNow(t, offerer, answerer)
Ok(())
}
func Test_Interceptor_BindUnbind(t *testing.T) {
lim := test.TimeOut(time.Second * 10)
defer lim.Stop()
report := test.CheckRoutines(t)
defer report()
m := &MediaEngine{}
assert.NoError(t, m.RegisterDefaultCodecs())
var (
cntBindRTCPReader uint32
cntBindRTCPWriter uint32
cntBindLocalStream uint32
cntUnbindLocalStream uint32
cntBindRemoteStream uint32
cntUnbindRemoteStream uint32
cntClose uint32
)
mockInterceptor := &mock_interceptor.Interceptor{
BindRTCPReaderFn: func(reader interceptor.RTCPReader) interceptor.RTCPReader {
atomic.AddUint32(&cntBindRTCPReader, 1)
return reader
},
BindRTCPWriterFn: func(writer interceptor.RTCPWriter) interceptor.RTCPWriter {
atomic.AddUint32(&cntBindRTCPWriter, 1)
return writer
},
BindLocalStreamFn: func(i *interceptor.StreamInfo, writer interceptor.RTPWriter) interceptor.RTPWriter {
atomic.AddUint32(&cntBindLocalStream, 1)
return writer
},
UnbindLocalStreamFn: func(i *interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindLocalStream, 1)
},
BindRemoteStreamFn: func(i *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
atomic.AddUint32(&cntBindRemoteStream, 1)
return reader
},
UnbindRemoteStreamFn: func(i *interceptor.StreamInfo) {
atomic.AddUint32(&cntUnbindRemoteStream, 1)
},
CloseFn: func() error {
atomic.AddUint32(&cntClose, 1)
return nil
},
}
ir := &interceptor.Registry{}
ir.Add(&mock_interceptor.Factory{
NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) { return mockInterceptor, nil },
})
sender, receiver, err := NewAPI(WithMediaEngine(m), WithInterceptorRegistry(ir)).newPair(Configuration{})
assert.NoError(t, err)
track, err := NewTrackLocalStaticSample(RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion")
assert.NoError(t, err)
_, err = sender.AddTrack(track)
assert.NoError(t, err)
receiverReady, receiverReadyFn := context.WithCancel(context.Background())
receiver.OnTrack(func(track *TrackRemote, _ *RTPReceiver) {
_, _, readErr := track.ReadRTP()
assert.NoError(t, readErr)
receiverReadyFn()
})
assert.NoError(t, signalPair(sender, receiver))
ticker := time.NewTicker(time.Millisecond * 20)
defer ticker.Stop()
func() {
for {
select {
case <-receiverReady.Done():
return
case <-ticker.C:
// Send packet to make receiver track actual creates RTPReceiver.
assert.NoError(t, track.WriteSample(media.Sample{Data: []byte{0xAA}, Duration: time.Second}))
}
}
}()
closePairNow(t, sender, receiver)
// Bind/UnbindLocal/RemoteStream should be called from one side.
if cnt := atomic.LoadUint32(&cntBindLocalStream); cnt != 1 {
t.Errorf("BindLocalStreamFn is expected to be called once, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntUnbindLocalStream); cnt != 1 {
t.Errorf("UnbindLocalStreamFn is expected to be called once, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntBindRemoteStream); cnt != 1 {
t.Errorf("BindRemoteStreamFn is expected to be called once, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntUnbindRemoteStream); cnt != 1 {
t.Errorf("UnbindRemoteStreamFn is expected to be called once, but called %d times", cnt)
}
// BindRTCPWriter/Reader and Close should be called from both side.
if cnt := atomic.LoadUint32(&cntBindRTCPWriter); cnt != 2 {
t.Errorf("BindRTCPWriterFn is expected to be called twice, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntBindRTCPReader); cnt != 2 {
t.Errorf("BindRTCPReaderFn is expected to be called twice, but called %d times", cnt)
}
if cnt := atomic.LoadUint32(&cntClose); cnt != 2 {
t.Errorf("CloseFn is expected to be called twice, but called %d times", cnt)
}
}
func Test_InterceptorRegistry_Build(t *testing.T) {
registryBuildCount := 0
ir := &interceptor.Registry{}
ir.Add(&mock_interceptor.Factory{
NewInterceptorFn: func(_ string) (interceptor.Interceptor, error) {
registryBuildCount++
return &interceptor.NoOp{}, nil
},
})
peerConnectionA, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
assert.NoError(t, err)
peerConnectionB, err := NewAPI(WithInterceptorRegistry(ir)).NewPeerConnection(Configuration{})
assert.NoError(t, err)
assert.Equal(t, 2, registryBuildCount)
closePairNow(t, peerConnectionA, peerConnectionB)
}
*/
Loading

0 comments on commit 62741ba

Please sign in to comment.