Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

providing client code with ability to publish user data #10

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cpp/client/src/video-source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ VideoSource::VideoSource(io_service& io_service,
const std::string& sourcePath,
const boost::shared_ptr<RawFrame>& frame):
io_(io_service), frame_(frame), isRunning_(false), framesSourced_(0),
nRewinds_(0)
nRewinds_(0), userDataSize_(0), userData_(nullptr)
{
assert(frame.get());

Expand Down Expand Up @@ -107,7 +107,8 @@ void VideoSource::deliverFrame(const RawFrame& frame)
{
for (auto capturer:capturers_)
capturer->incomingArgbFrame(frame.getWidth(), frame.getHeight(),
frame.getBuffer().get(), frame.getFrameSizeInBytes());
frame.getBuffer().get(), frame.getFrameSizeInBytes(),
userDataSize_, userData_);

// LogTrace("") << "delivered frame to " << capturers_.size() << " capturers" << endl;
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/client/src/video-source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class VideoSource {

private:
bool isRunning_;
unsigned int framesSourced_, nRewinds_;
unsigned char* userData_;
unsigned int framesSourced_, nRewinds_, userDataSize_;
boost::shared_ptr<FileFrameSource> source_;
boost::shared_ptr<RawFrame> frame_;
std::vector<ndnrtc::IExternalCapturer*> capturers_;
Expand All @@ -55,4 +56,4 @@ class VideoSource {
void deliverFrame(const RawFrame& frame);
};

#endif
#endif
8 changes: 6 additions & 2 deletions cpp/include/interfaces.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ namespace ndnrtc
virtual int incomingArgbFrame(const unsigned int width,
const unsigned int height,
unsigned char* argbFrameData,
unsigned int frameSize) = 0;
unsigned int frameSize,
unsigned int userDataSize = 0,
unsigned char* userData = nullptr) = 0;

/**
* Alternative method for delivering YUV frames (I420 or y420 or
Expand All @@ -90,7 +92,9 @@ namespace ndnrtc
const unsigned int strideV,
const unsigned char* yBuffer,
const unsigned char* uBuffer,
const unsigned char* vBuffer) = 0;
const unsigned char* vBuffer,
unsigned int userDataSize = 0,
unsigned char* userData = nullptr) = 0;
};
}

Expand Down
10 changes: 7 additions & 3 deletions cpp/include/local-stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,9 @@ namespace ndnrtc {
int incomingArgbFrame(const unsigned int width,
const unsigned int height,
unsigned char* argbFrameData,
unsigned int frameSize);
unsigned int frameSize,
unsigned int userDataSize = 0,
unsigned char* userData = nullptr);

/**
* Encode and publish I420 frame data.
Expand All @@ -190,7 +192,9 @@ namespace ndnrtc {
const unsigned int strideV,
const unsigned char* yBuffer,
const unsigned char* uBuffer,
const unsigned char* vBuffer);
const unsigned char* vBuffer,
unsigned int userDataSize = 0,
unsigned char* userData = nullptr);

/**
* Returns full stream prefix used for publishing data
Expand Down Expand Up @@ -225,4 +229,4 @@ namespace ndnrtc {
};
}

#endif
#endif
59 changes: 54 additions & 5 deletions cpp/src/frame-data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ namespace ndnrtc {
ENABLE_IF(T,Mutable)
VideoFramePacketT(const webrtc::EncodedImage& frame):
HeaderPacketT<CommonHeader,T>(frame._length, frame._buffer),
isSyncListSet_(false)
isSyncListSet_(false), isUserDataSet_(false)
{
assert(frame._encodedWidth);
assert(frame._encodedHeight);
Expand Down Expand Up @@ -652,13 +652,40 @@ namespace ndnrtc {
typedef typename std::vector<typename DataPacketT<T>::Blob>::const_iterator BlobIterator;
std::map<std::string, PacketNumber> syncList;

for (BlobIterator blob = this->blobs_.begin()+1;
blob+1 < this->blobs_.end(); blob+=2)
syncList[std::string((const char*)blob->data(), blob->size())] = *(PacketNumber*)(blob+1)->data();
for (BlobIterator blob = this->blobs_.begin()+1;blob+1 < this->blobs_.end(); blob+=2)
{
std::string mapKey((const char*)blob->data(), blob->size());
std::map<std::string, PacketNumber>::iterator it = syncList.find(mapKey);
if(it == syncList.end())
syncList[mapKey] = *(PacketNumber*)(blob+1)->data();
}

return boost::move(syncList);
}

const std::map<std::string, std::string> getUserData() const
{
typedef typename std::vector<typename DataPacketT<T>::Blob>::const_iterator BlobIterator;
std::map<std::string, std::string> userData;
std::vector<std::string> keyVector;

for (BlobIterator blob = this->blobs_.begin()+1;blob+1 < this->blobs_.end(); blob+=2)
{
std::string mapKey((const char*)blob->data(), blob->size());
std::vector<std::string>::iterator it = std::find(keyVector.begin(), keyVector.end(), mapKey);

if(it == keyVector.end())
keyVector.push_back(mapKey);
else
{
std::string ud((const char*)(blob+1)->data());
userData[mapKey] = ud;
}
}

return boost::move(userData);
}

ENABLE_IF(T,Mutable)
boost::shared_ptr<NetworkData>
getParityData(size_t segmentLength, double ratio)
Expand Down Expand Up @@ -702,6 +729,28 @@ namespace ndnrtc {
isSyncListSet_ = true;
}

/**
* Provide user data along with video frames for client code.
* e.g. unsigned char userData[] = "data\0", and its length is defined as 4.
* NOTE: User data should be end up with '\0'.
*/
ENABLE_IF(T,Mutable)
void setUserData(const std::map<std::string , std::pair<unsigned int, unsigned char*>>& userData)
{
if (this->isHeaderSet()) throw std::runtime_error("Can't add more data to this packet"
" as header has been set already");
if (isUserDataSet_) throw std::runtime_error("User Data has been already set");

for (auto it:userData)
{
std::string ud(it.second.second, it.second.second+it.second.first+1);
this->addBlob(it.first.size(), (uint8_t*)it.first.c_str());
this->addBlob(ud.length(), (uint8_t*)ud.c_str());
}

isUserDataSet_ = true;
}

typedef std::vector<ImmutableHeaderPacket<VideoFrameSegmentHeader>> ImmutableVideoSegmentsVector;
typedef std::vector<ImmutableHeaderPacket<DataSegmentHeader>> ImmutableRecoverySegmentsVector;

Expand All @@ -720,7 +769,7 @@ namespace ndnrtc {
} __attribute__((packed)) Header;

webrtc::EncodedImage frame_;
bool isSyncListSet_;
bool isSyncListSet_, isUserDataSet_;
};

typedef VideoFramePacketT<> VideoFramePacket;
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/local-stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,12 @@ void LocalVideoStream::removeThread(const string& threadName)
int LocalVideoStream::incomingArgbFrame(const unsigned int width,
const unsigned int height,
unsigned char* argbFrameData,
unsigned int frameSize)
unsigned int frameSize,
unsigned int userDataSize,
unsigned char* userData)
{
pimpl_->incomingFrame(ArgbRawFrameWrapper({width, height, argbFrameData, frameSize}));
pimpl_->incomingFrame(ArgbRawFrameWrapper({width, height, argbFrameData, frameSize}),
userDataSize, userData);
return RESULT_OK;
}

Expand All @@ -148,10 +151,12 @@ int LocalVideoStream::incomingI420Frame(const unsigned int width,
const unsigned int strideV,
const unsigned char* yBuffer,
const unsigned char* uBuffer,
const unsigned char* vBuffer)
const unsigned char* vBuffer,
unsigned int userDataSize,
unsigned char* userData)
{
pimpl_->incomingFrame(I420RawFrameWrapper({width, height, strideY, strideU,
strideV, yBuffer, uBuffer, vBuffer}));
strideV, yBuffer, uBuffer, vBuffer}), userDataSize, userData);
return RESULT_OK;
}

Expand Down
20 changes: 13 additions & 7 deletions cpp/src/video-stream-impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ vector<string> VideoStreamImpl::getThreads() const
return threads;
}

void VideoStreamImpl::incomingFrame(const ArgbRawFrameWrapper& w)
void VideoStreamImpl::incomingFrame(const ArgbRawFrameWrapper& w, unsigned int userDataSize, unsigned char* userData)
{
LogDebugC << "⤹ incoming ARGB frame " << w.width_ << "x" << w.height_ << std::endl;
feedFrame(conv_ << w);
feedFrame(conv_ << w, userDataSize, userData);
}

void VideoStreamImpl::incomingFrame(const I420RawFrameWrapper& w)
void VideoStreamImpl::incomingFrame(const I420RawFrameWrapper& w, unsigned int userDataSize, unsigned char* userData)
{
LogDebugC << "⤹ incoming I420 frame " << w.width_ << "x" << w.height_ << std::endl;
feedFrame(conv_ << w);
feedFrame(conv_ << w, userDataSize, userData);
}

void VideoStreamImpl::setLogger(boost::shared_ptr<ndnlog::new_api::Logger> logger)
Expand Down Expand Up @@ -149,7 +149,7 @@ void VideoStreamImpl::remove(const string& threadName)
}
}

void VideoStreamImpl::feedFrame(const WebRtcVideoFrame& frame)
void VideoStreamImpl::feedFrame(const WebRtcVideoFrame& frame, unsigned int userDataSize, unsigned char* userData)
{
(*statStorage_)[Indicator::CapturedNum]++;

Expand All @@ -165,30 +165,34 @@ void VideoStreamImpl::feedFrame(const WebRtcVideoFrame& frame)
LogDebugC << "↓ feeding "<< playbackCounter_ << "p into encoders..." << std::endl;

map<string,FutureFramePtr> futureFrames;
map<string, pair<unsigned int, unsigned char*>> allUserData;
for (auto it:threads_)
{
FutureFramePtr ff =
boost::make_shared<FutureFrame>(boost::move(boost::async(boost::launch::async,
boost::bind(&VideoThread::encode, it.second.get(), (*scalers_[it.first])(frame)))));
futureFrames[it.first] = ff;
allUserData[it.first] = make_pair(userDataSize, userData);
}

map<string, FramePacketPtr> frames;
map<string, pair<unsigned int, unsigned char*>> filteredUserData;
for (auto it:futureFrames)
{
FramePacketPtr f(it.second->get());
if (f.get())
{
(*statStorage_)[Indicator::EncodedNum]++;
frames[it.first] = f;
filteredUserData[it.first] = allUserData[it.first];
}
}

(*statStorage_)[Indicator::DroppedNum] += (threads_.size()-frames.size());

if (frames.size())
{
publish(frames);
publish(frames, filteredUserData);
playbackCounter_++;
}

Expand All @@ -203,7 +207,8 @@ void VideoStreamImpl::feedFrame(const WebRtcVideoFrame& frame)
LogWarnC << "incoming frame was given, but there are no threads" << std::endl;
}

void VideoStreamImpl::publish(map<string, FramePacketPtr>& frames)
void
VideoStreamImpl::publish(map<string, FramePacketPtr>& frames, map<string, pair<unsigned int, unsigned char*>> filteredUserData)
{
LogTraceC << "will publish " << frames.size() << " frames" << std::endl;

Expand All @@ -217,6 +222,7 @@ void VideoStreamImpl::publish(map<string, FramePacketPtr>& frames)
packetHdr.publishUnixTimestampMs_ = clock::unixTimestamp();

it.second->setSyncList(getCurrentSyncList(isKey));
it.second->setUserData(filteredUserData);
it.second->setHeader(packetHdr);

LogTraceC << "thread " << it.first << " " << packetHdr.sampleRate_
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/video-stream-impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <boost/thread.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/atomic.hpp>
#include <boost/assign.hpp>

#include "media-stream-base.hpp"
#include "ndnrtc-object.hpp"
Expand Down Expand Up @@ -46,8 +47,8 @@ namespace ndnrtc {

std::vector<std::string> getThreads() const;

void incomingFrame(const ArgbRawFrameWrapper&);
void incomingFrame(const I420RawFrameWrapper&);
void incomingFrame(const ArgbRawFrameWrapper&, unsigned int userDataSize=0, unsigned char* userData=nullptr);
void incomingFrame(const I420RawFrameWrapper&, unsigned int userDataSize=0, unsigned char* userData=nullptr);
void setLogger(boost::shared_ptr<ndnlog::new_api::Logger>);

private:
Expand Down Expand Up @@ -88,12 +89,14 @@ namespace ndnrtc {
void remove(const std::string& threadName);
bool updateMeta();

void feedFrame(const WebRtcVideoFrame& frame);
void publish(std::map<std::string, boost::shared_ptr<VideoFramePacketAlias>>& frames);
void feedFrame(const WebRtcVideoFrame& frame, unsigned int userDataSize=0, unsigned char* userData=nullptr);
void publish(std::map<std::string, boost::shared_ptr<VideoFramePacketAlias>>& frames,
std::map<std::string, std::pair<unsigned int, unsigned char*>> filteredUserData =
boost::assign::map_list_of("", std::make_pair(0, nullptr)));
void publish(const std::string& thread, boost::shared_ptr<VideoFramePacketAlias>& fp);
void publishManifest(ndn::Name dataName, PublishedDataPtrVector& segments);
std::map<std::string, PacketNumber> getCurrentSyncList(bool forKey = false);
};
}

#endif
#endif
8 changes: 6 additions & 2 deletions cpp/tests/mock-objects/external-capturer-mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ class MockExternalCapturer : public ndnrtc::IExternalCapturer
MOCK_METHOD4(incomingArgbFrame, int(const unsigned int width,
const unsigned int height,
unsigned char* argbFrameData,
unsigned int frameSize));
unsigned int frameSize,
unsigned int userDataSize,
unsigned char* userData));
MOCK_METHOD8(incomingI420Frame, int(const unsigned int width,
const unsigned int height,
const unsigned int strideY,
const unsigned int strideU,
const unsigned int strideV,
const unsigned char* yBuffer,
const unsigned char* uBuffer,
const unsigned char* vBuffer));
const unsigned char* vBuffer,
unsigned int userDataSize,
unsigned char* userData));
};

#endif
Loading