diff --git a/configure.cpp b/configure.cpp index 57bad4e..6725068 100644 --- a/configure.cpp +++ b/configure.cpp @@ -55,6 +55,7 @@ Configure::Configure(vbit::Debug *debug, int argc, char** argv) : _PID = 0x20; // default PID is 0x20 _packetServerPort = 0; // port 0 disables packet server + _datacastServerPort = 0; uint8_t priority[8]={9,3,3,6,3,3,5,6}; // 1=High priority,9=low. Note: priority[0] is mag 8 @@ -241,6 +242,29 @@ Configure::Configure(vbit::Debug *debug, int argc, char** argv) : exit(EXIT_FAILURE); } } + else if (arg == "--datacast") + { + if (i + 1 < argc) + { + errno = 0; + char *end_ptr; + long l = std::strtol(argv[++i], &end_ptr, 10); + if (errno == 0 && *end_ptr == '\0' && l > 0 && l < 65536) + { + _datacastServerPort = (int)l; + } + else + { + std::cerr << "invalid server port number\n"; + exit(EXIT_FAILURE); + } + } + else + { + std::cerr << "--datacast requires a port number\n"; + exit(EXIT_FAILURE); + } + } else { std::cerr << "unrecognised argument: " << arg << std::endl; diff --git a/configure.h b/configure.h index 13cd6fa..67bc1c3 100644 --- a/configure.h +++ b/configure.h @@ -68,6 +68,9 @@ class Configure uint16_t GetPacketServerPort(){return _packetServerPort;} bool GetPacketServerEnabled(){return _packetServerPort != 0;} + uint16_t GetDatacastServerPort(){return _datacastServerPort;} + bool GetDatacastServerEnabled(){return _datacastServerPort != 0;} + private: vbit::Debug* _debug; int DirExists(std::string *path); @@ -104,6 +107,7 @@ class Configure uint16_t _PID; uint16_t _packetServerPort; + uint16_t _datacastServerPort; }; } diff --git a/datacastServer.cpp b/datacastServer.cpp new file mode 100644 index 0000000..840a0c5 --- /dev/null +++ b/datacastServer.cpp @@ -0,0 +1,343 @@ +/* Provide a TCP server for injecting datacast packet streams */ + +#include "datacastServer.h" + +using namespace ttx; + +DatacastServer::DatacastServer(ttx::Configure *configure, vbit::Debug *debug) : + _debug(debug), + _portNumber(configure->GetDatacastServerPort()), + _isActive(false) +{ + /* initialise sockets */ + _serverSock = -1; + + for (int i=0; i < MAXCLIENTS; i++) + { + _clientSocks[i] = -1; + _clientChannel[i] = -1; // no channel set + } + + _datachannel[0]=nullptr; // can't use datachannel 0 + for (int dc=1; dc<16; dc++) + { + _datachannel[dc] = new vbit::PacketDatacast(dc, configure); // create 15 datacast channels + } +} + +DatacastServer::~DatacastServer() +{ +} + +void DatacastServer::SocketError(std::string errorMessage) +{ + if (_serverSock >= 0) + { + #ifdef WIN32 + closesocket(_serverSock); + #else + close(_serverSock); + #endif + } + + for (int i = 0; i < MAXCLIENTS; i++) + { + if (_clientSocks[i] >= 0) + #ifdef WIN32 + closesocket(_clientSocks[i]); + #else + close(_clientSocks[i]); + #endif + _clientChannel[i] = -1; // no channel set + } + + std::cerr << errorMessage; +} + +void DatacastServer::run() +{ + _debug->Log(vbit::Debug::LogLevels::logDEBUG,"[DatacastServer::run] Datacast server thread started"); + + int newSock; + int sock; + struct sockaddr_in address; + char readBuffer[256]; + fd_set readfds; + +#ifdef WIN32 + int addrlen; + WSADATA wsaData; + int iResult; + + // Initialize Winsock + iResult = WSAStartup(MAKEWORD(2,2), &wsaData); + if (iResult != 0) + { + SocketError("[DatacastServer::run] WSAStartup failed\n"); + return; + } +#else + unsigned int addrlen; +#endif + + /* Create socket */ + if ((_serverSock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + { + SocketError("[DatacastServer::run] socket() failed\n"); + return; + } + + int reuse = true; + + /* Allow multiple connnections */ + if(setsockopt(_serverSock, SOL_SOCKET, SO_REUSEADDR, (const char *)&reuse, sizeof(reuse)) < 0) + { + SocketError("[DatacastServer::run] setsockopt() SO_REUSEADDR failed\n"); + return; + } + + address.sin_family = AF_INET; + address.sin_addr.s_addr = htonl(INADDR_ANY); + address.sin_port = htons(_portNumber); + + /* bind socked */ + if (bind(_serverSock, (struct sockaddr *) &address, sizeof(address)) < 0) + { + SocketError("[DatacastServer::run] bind() failed\n"); + return; + } + + /* Listen for incoming connections */ + if (listen(_serverSock, MAXPENDING) < 0) + { + SocketError("[DatacastServer::run] listen() failed\n"); + return; + } + + addrlen = sizeof(address); + + while(true) + { + FD_ZERO(&readfds); + FD_SET(_serverSock, &readfds); + + bool active = false; + for (int i = 0; i < MAXCLIENTS; i++) + { + sock = _clientSocks[i]; + + if(sock >= 0){ + FD_SET(sock , &readfds); + active = true; /* packet server has connections */ + } + } + _isActive = active; + + /* wait for activity on any socket */ + if ((select(FD_SETSIZE, &readfds, NULL, NULL, NULL) < 0) && (errno!=EINTR)) + SocketError("[DatacastServer::run] select() failed"); + + if (FD_ISSET(_serverSock, &readfds)) + { + /* incoming connection to server */ + if ((newSock = accept(_serverSock, (struct sockaddr *)&address, &addrlen))<0) + { + SocketError("[DatacastServer::run] accept() failed"); + return; + } + + #ifdef WIN32 + u_long ul = 1; + if (ioctlsocket(newSock, FIONBIO, &ul) < 0) + { + SocketError("[DatacastServer::run] ioctlsocket() failed"); + return; + } + #else + if (fcntl(newSock, F_SETFL, fcntl(newSock, F_GETFL, 0) | O_NONBLOCK) < 0) + { + SocketError("[DatacastServer::run] fcntl() failed"); + return; + } + #endif + + for (int i = 0; i <= MAXCLIENTS; i++) + { + if (i == MAXCLIENTS) + { + /* no more client slots so reject */ + #ifdef WIN32 + closesocket(newSock); + #else + close(newSock); + #endif + _debug->Log(vbit::Debug::LogLevels::logWARN,"[DatacastServer::run] reject new connection from " + std::string(inet_ntoa(address.sin_addr)) + " (too many connections)"); + break; + } + + /* find unused slot */ + if( _clientSocks[i] < 0 ) + { + /* add to active sockets */ + _clientSocks[i] = newSock; + _clientChannel[i] = -1; // no channel set + _debug->Log(vbit::Debug::LogLevels::logINFO,"[DatacastServer::run] new connection from " + std::string(inet_ntoa(address.sin_addr)) + ":" + std::to_string(ntohs(address.sin_port)) + " as socket " + std::to_string(newSock)); + break; + } + } + } + else + { + /* a client socket has activity */ + for (int i = 0; i < MAXCLIENTS; i++) + { + sock = _clientSocks[i]; + + if (sock >= 0 && FD_ISSET(sock , &readfds)) + { + /* socket has activity */ + + int n = recv(sock, readBuffer, 1, MSG_PEEK); // peek at first byte of message + if (n == 1) + { + // byte 0 of message is message length + int len = (uint8_t)readBuffer[0]; + n = recv(sock, readBuffer, len, 0); // try to read whole message + if (n == len) + { + std::vector res = {DCOK}; + + // byte 1 of message is command number + switch (readBuffer[1]){ + case DCSET: // set datacast channel + { + int ch = readBuffer[2]; + if (n == 3 && ch >= 0 && ch <= 15) + { + _clientChannel[i] = -1; // release a current datachannel + + // check if another client is using desired datachannel + for (int j=0; j 0 && n == 42) // 40 bytes of packet data + { + std::vector data(readBuffer+2, readBuffer+n); + + if(_datachannel[_clientChannel[i]]->PushRaw(&data)) + { + res[0] = DCFULL; // buffer full + } + } + else + { + res[0] = DCERR; + } + break; + } + + case DCFORMATA: // encode and push a format A datacast payload to buffer + { + /* Does _not_ automate repeats, continuity indicator, etc. + Format is: + byte 2: bits 4-7 IAL, bits 1-3 flags RI,CI,DL + byte 3-5: 24 bit Service Packet Address (little endian) + byte 6: Repeat indicator + byte 7: Continuity indicator + byte 8+: payload data + */ + if (_clientChannel[i] > 0 && n > 8) + { + uint8_t flags = readBuffer[2] & 0xe; + uint8_t ial = readBuffer[2] >> 4; + uint32_t spa = readBuffer[3] | (readBuffer[4] << 8) | (readBuffer[5] << 16); + uint8_t ri = readBuffer[6]; + uint8_t ci = readBuffer[7]; + + std::vector data(readBuffer+8, readBuffer+n); + + int bytes = _datachannel[_clientChannel[i]]->PushIDLA(flags, ial, spa, ri, ci, &data); + + if (bytes == 0) // buffer full + res[0] = DCFULL; + else if (bytes < n-8) // payload didn't fit + res[0] = DCTRUNC; // warn of truncation + + res.push_back(bytes); // return number of bytes written + + break; + } + else + { + res[0] = DCERR; + res.push_back(0); // no bytes written + } + break; + } + + default: // unknown command + { + res[0] = DCERR; + break; + } + } + + if (res.size() > 254) + { + _debug->Log(vbit::Debug::LogLevels::logERROR,"[DatacastServer::run] Response too long"); + res.resize(254); // truncate! + } + + res.insert(res.begin(), res.size()+1); // prepend message size + + int n = send(sock, (char*)res.data(), res.size(), 0); // send response + if (n > 0) + continue; // next socket in loop + } + // else fall through to error handling + } + // couldn't read/write all bytes + getpeername(sock, (struct sockaddr*)&address, &addrlen); + if (n == 0) + { + /* client disconnected */ + _debug->Log(vbit::Debug::LogLevels::logINFO,"[DatacastServer::run] closing connection from " + std::string(inet_ntoa(address.sin_addr)) + ":" + std::to_string(ntohs(address.sin_port)) + " on socket " + std::to_string(sock)); + } + else + { + #ifdef WIN32 + int e = WSAGetLastError(); + #else + int e = errno; + #endif + + _debug->Log(vbit::Debug::LogLevels::logWARN,"[DatacastServer::run] closing connection from " + std::string(inet_ntoa(address.sin_addr)) + ":" + std::to_string(ntohs(address.sin_port)) + " recv error " + std::to_string(e) + " on socket " + std::to_string(sock)); + } + + /* close the socket when any error occurs */ + _clientSocks[i] = -1; /* free slot */ + + #ifdef WIN32 + closesocket(sock); + #else + close(sock); + #endif + } + } + } + } +} diff --git a/datacastServer.h b/datacastServer.h new file mode 100644 index 0000000..bb935ff --- /dev/null +++ b/datacastServer.h @@ -0,0 +1,62 @@ +#ifndef _DATACASTSERVER_H_ +#define _DATACASTSERVER_H_ + +#include "configure.h" +#include "debug.h" +#include "packet.h" +#include "packetDatacast.h" + +#ifdef WIN32 +#include +#else +#include +#include /* for socket(), bind(), and connect() */ +#include /* for fd_set() */ +#include /* for sockaddr_in and inet_ntoa() */ +#include /* for close() */ +#endif + +#define DCSET 0x00 +#define DCRAW 0x01 +#define DCFORMATA 0x02 + +#define DCOK 0x00 /* command successful */ +#define DCTRUNC 0xfd /* command completed but data was truncated */ +#define DCFULL 0xfe /* command failed as buffer is full */ +#define DCERR 0xff /* command failed */ + +namespace ttx + +{ + class DatacastServer + { + public: + DatacastServer(ttx::Configure *configure, vbit::Debug *debug); + ~DatacastServer(); + + void run(); + bool GetIsActive(){return _isActive;}; /* is the packet server running? */ + + + vbit::PacketDatacast** GetDatachannels() { vbit::PacketDatacast **channels=_datachannel; return channels; }; + + private: + vbit::Debug* _debug; + vbit::PacketDatacast* _datachannel[16]; /* array of datacast sources */ + + static const uint16_t MAXPENDING=5; + static const uint16_t MAXCLIENTS=5; + + int _portNumber; + int _serverSock; + + int _clientSocks[MAXCLIENTS]; + int _clientChannel[MAXCLIENTS]; + + bool _isActive; + + void SocketError(std::string errorMessage); // handle fatal socket errors + }; +} + +#endif diff --git a/packetDatacast.cpp b/packetDatacast.cpp new file mode 100644 index 0000000..c16c98f --- /dev/null +++ b/packetDatacast.cpp @@ -0,0 +1,103 @@ +/* Packet source for datacast channels */ + +#include "packetDatacast.h" + +using namespace vbit; + +PacketDatacast::PacketDatacast(uint8_t datachannel, ttx::Configure* configure) : + _datachannel(datachannel) +{ + uint16_t datacastLines = configure->GetDatacastLines(); + if (datacastLines == 0 || datacastLines > 4) + datacastLines = 4; /* cap at 4 lines */ + _bufferSize = datacastLines*4; /* assign space for around 4 fields */ + + for (int i=0; i<_bufferSize; i++){ + // build packet buffer + _packetBuffer.push_back(new vbit::Packet(8,25," ")); + } + // set head and tail indices + _head = 0; + _tail = 0; +} + +PacketDatacast::~PacketDatacast() +{ + +} + +int PacketDatacast::PushRaw(std::vector *data) +{ + /* push 40 bytes of raw packet data into the buffer */ + Packet* p = GetFreeBuffer(); + if (p == nullptr) + return -1; + + p->SetPacketRaw(*data); // copy data into buffer packet + _head = (_head + 1) % _bufferSize; // advance head on circular buffer + + return 0; +} + +int PacketDatacast::PushIDLA(uint8_t flags, uint8_t ial, uint32_t spa, uint8_t ri, uint8_t ci, std::vector *data) +{ + /* push a format A datacast packet into the buffer */ + int bytes = 0; + Packet* p = GetFreeBuffer(); + if (p != nullptr) + { + bytes = p->IDLA(_datachannel, flags, ial, spa, ri, ci, *data); + _head = (_head + 1) % _bufferSize; // advance head on circular buffer + } + + return bytes; +} + +Packet* PacketDatacast::GetFreeBuffer() +{ + /* gets a pointer to the next free buffer packet or a null pointer if buffer is full + Does NOT actually advance the head to avoid a race condition */ + + if ((_head + 1) % _bufferSize == _tail) + return nullptr; // buffer full + else + return _packetBuffer[_head]; +} + +Packet* PacketDatacast::GetPacket(Packet* p) +{ + if (_tail == _head) + { + // generate some hardcoded datacast filler + std::string str = "VBIT2 Datacast Service "; + std::vector data(str.begin(), str.end()); + p->IDLA(8, Packet::IDLA_DL, 6, 0xfffffe, 0, 0, data); + // TODO: make channel, address, and content of filler configurable + } + else + { + // copy data from buffer to packet + p->SetMRAG(_datachannel & 0x7,((_datachannel & 8) >> 3) + 30); + p->SetPacketRaw(std::vector(_packetBuffer[_tail]->Get_packet().begin()+5, _packetBuffer[_tail]->Get_packet().end())); + + _tail = (_tail + 1) % _bufferSize; // advance tail on circular buffer + } + + return p; +} + +bool PacketDatacast::IsReady(bool force) +{ + bool result=false; + + if (GetEvent(EVENT_DATABROADCAST)) + { + // Don't clear event, Service::_updateEvents explicitly turns it off for non datacast lines + + if (_tail != _head) + result = true; + else + result = force; + } + return result; +} diff --git a/packetDatacast.h b/packetDatacast.h new file mode 100644 index 0000000..294f2cd --- /dev/null +++ b/packetDatacast.h @@ -0,0 +1,34 @@ +#ifndef PACKETDATACAST_H +#define PACKETDATACAST_H + +#include "packetsource.h" +#include "configure.h" + +namespace vbit +{ + class PacketDatacast : public PacketSource + { + public: + PacketDatacast(uint8_t datachannel, ttx::Configure* configure); + virtual ~PacketDatacast(); + + Packet* GetPacket(Packet* p) override; + bool IsReady(bool force=false); + + int PushRaw(std::vector *data); + int PushIDLA(uint8_t flags, uint8_t ial, uint32_t spa, uint8_t ri, uint8_t ci, std::vector *data); + + protected: + private: + uint8_t _datachannel; + + std::vector _packetBuffer; + uint8_t _bufferSize; + uint8_t _head; + uint8_t _tail; + + Packet* GetFreeBuffer(); + }; +} + +#endif // PACKETDATACAST_H diff --git a/packetDebug.cpp b/packetDebug.cpp index 9add7a7..4df123a 100644 --- a/packetDebug.cpp +++ b/packetDebug.cpp @@ -70,9 +70,10 @@ Packet* PacketDebug::GetPacket(Packet* p) 6: debug stream data format version 7: clock flags b0: master clock resynchronisation - 8-11: current system clock - 12-15: program startup timestamp - 16-25: reserved + 8-11: current system clock seconds + 12: current system clock field + 13-16: program startup timestamp + 17-25: reserved */ _debugType = FORMAT2; // cycle to format 2 on next packet @@ -88,6 +89,7 @@ Packet* PacketDebug::GetPacket(Packet* p) data.push_back(_systemClock >> 16); data.push_back(_systemClock >> 8); data.push_back(_systemClock); + data.push_back(_systemClockFields); // vbit2 startup timestamp data.push_back(_startupTime >> 24); @@ -95,7 +97,7 @@ Packet* PacketDebug::GetPacket(Packet* p) data.push_back(_startupTime >> 8); data.push_back(_startupTime); - // 16 of 26 bytes used + // 17 of 26 bytes used break; } @@ -147,7 +149,7 @@ bool PacketDebug::IsReady(bool force) { bool result=false; - if (GetEvent(EVENT_DATABROADCAST)) + if (GetEvent(EVENT_DATABROADCAST) && GetEvent(EVENT_FIELD)) { // Don't clear event, Service::_updateEvents explicitly turns it off for non datacast lines if (_debugType > FORMAT1) // we are in the middle of sending debug data already @@ -184,17 +186,20 @@ bool PacketDebug::IsReady(bool force) if (_clockFlags || _magFlags || force) // generate packets if any of the content has changed, or if we are forcing output to use this source as datacast filler result = true; } + + ClearEvent(EVENT_FIELD); // don't jam all datacast lines in a field } return result; } -void PacketDebug::TimeAndField(MasterClock::timeStruct masterClock, time_t systemClock, bool resync) +void PacketDebug::TimeAndField(MasterClock::timeStruct masterClock, time_t systemClock, uint8_t systemClockFields, bool resync) { // update the clocks in _debugData struct - called once per field by Service::_updateEvents() _masterClockSeconds = masterClock.seconds; _masterClockFields = masterClock.fields; _systemClock = systemClock; + _systemClockFields = systemClockFields; if (resync) _clockFlags |= CFLAGRESYNC; // set flag } diff --git a/packetDebug.h b/packetDebug.h index 3917e0f..ef579ba 100644 --- a/packetDebug.h +++ b/packetDebug.h @@ -8,7 +8,7 @@ #include "configure.h" #include "debug.h" -#define VBIT2_DEBUG_VERSION 0x01 // Debug packet version +#define VBIT2_DEBUG_VERSION 0x02 // Debug packet version namespace vbit { @@ -23,7 +23,7 @@ namespace vbit // overrides Packet* GetPacket(Packet* p) override; - void TimeAndField(MasterClock::timeStruct masterClock, time_t systemClock, bool resync); + void TimeAndField(MasterClock::timeStruct masterClock, time_t systemClock, uint8_t systemClockFields, bool resync); bool IsReady(bool force=false); @@ -41,6 +41,7 @@ namespace vbit time_t _masterClockSeconds; uint8_t _masterClockFields; time_t _systemClock; + uint8_t _systemClockFields; std::array _magDurations; std::array _magSizes; diff --git a/service.cpp b/service.cpp index d47b57a..dfa2103 100644 --- a/service.cpp +++ b/service.cpp @@ -5,11 +5,12 @@ using namespace ttx; using namespace vbit; -Service::Service(Configure *configure, vbit::Debug *debug, PageList *pageList, PacketServer *packetServer) : +Service::Service(Configure *configure, vbit::Debug *debug, PageList *pageList, PacketServer *packetServer, DatacastServer *datacastServer) : _configure(configure), _debug(debug), _pageList(pageList), _packetServer(packetServer), + _datacastServer(datacastServer), _fieldCounter(49) // roll over immediately { _magList=_pageList->GetMagazines(); @@ -27,6 +28,12 @@ Service::Service(Configure *configure, vbit::Debug *debug, PageList *pageList, P // register datacast sources _register(&_datacastSources, _packetDebug = new PacketDebug(_configure, _debug)); + vbit::PacketDatacast** channels = _datacastServer->GetDatachannels(); + for (int dc=1; dc<16; dc++) + { + _register(&_datacastSources, channels[dc]); + } + // don't register BSDP source _packet830 = new Packet830(_configure); @@ -55,7 +62,8 @@ int Service::run() { _debug->Log(Debug::LogLevels::logDEBUG,"[Service::run] This is the worker process"); - std::list::const_iterator iterator=_magazineSources.begin(); // Iterator for magazine packet sources + std::list::const_iterator magIterator=_magazineSources.begin(); // Iterator for magazine packet sources + std::list::const_iterator dcIterator=_datacastSources.begin(); // Iterator for datacast sources vbit::Packet* pkt=new vbit::Packet(8,25," "); // This just allocates storage. @@ -81,7 +89,7 @@ int Service::run() } } // Special case for debug. Ensures it can steal lines from other sources during DATABROADCAST event - else if (_packetDebug->IsReady(_datacastLines)) // force if log level DEBUG + else if (_packetDebug->IsReady(_debug->GetDebugLevel() == Debug::LogLevels::logDEBUG)) // force if log level is DEBUG { _packetDebug->GetPacket(pkt); _packetOutput(pkt); @@ -100,16 +108,45 @@ int Service::run() else { // Iterate through the packet sources until we get a packet to transmit - vbit::PacketSource* p; + vbit::PacketSource* p=nullptr; + + if (_datacastLines) + { + // try datacast sources first + for (unsigned int count = 0; count < _datacastSources.size()-1; count++) // iterate over sources once + { + // Get the packet source + p=(*dcIterator); + + if (++dcIterator==--_datacastSources.end()) // loop skipping last source (_packetDebug) + { + dcIterator=_datacastSources.begin(); + } + + if (p->IsReady(count==_datacastSources.size()-2)) // force if the last datacast source + break; // a datacast buffer has packets to go + else + p=nullptr; + } + if (p) + { + p->GetPacket(pkt); + _packetOutput(pkt); + continue; // main while loop + } + // else fall through to magazine sources + } + + // now try magazine sources uint8_t sourceCount=0; uint8_t listSize=_magazineSources.size(); bool force=false; do { // Loop back to the first source - if (iterator==_magazineSources.end()) + if (magIterator==_magazineSources.end()) { - iterator=_magazineSources.begin(); + magIterator=_magazineSources.begin(); } // If we have tried all sources with and without force, then break out with a filler to prevent a deadlock @@ -127,38 +164,62 @@ int Service::run() } // Get the packet source - p=(*iterator); - ++iterator; + p=(*magIterator); + ++magIterator; sourceCount++; // Count how many sources we tried. } while (!p->IsReady(force)); - // Did we find a packet? Then send it otherwise put out a filler + // Did we find a packet? if (p) { // GetPacket returns nullptr if the pkt isn't valid - if it's null go round again. if (p->GetPacket(pkt) != nullptr) { _packetOutput(pkt); + continue; // main while loop } - else - { - _packetOutput(filler); - } + // else fall through to filler } - else + + if (!_datacastLines) { - _packetOutput(filler); + // output datacast in place of filler + for (unsigned int count = 0; count < _datacastSources.size()-1; count++) // iterate over sources once + { + // Get the packet source + p=(*dcIterator); + + if (++dcIterator==--_datacastSources.end()) // loop skipping last source (_packetDebug) + { + dcIterator=_datacastSources.begin(); + } + + if (p->IsReady()) + { + p->GetPacket(pkt); + break; + } + else + p=nullptr; + } + + if (p) + { + _packetOutput(pkt); + continue; // main while loop + } + // else fall through to filler } + + _packetOutput(filler); } } // while forever return 99; // can't return but this keeps the compiler happy } // worker -#define FORWARDSBUFFER 1 // how far into the future vbit2 should run before rate limiting in seconds - void Service::_updateEvents() { vbit::MasterClock *mc = mc->Instance(); @@ -167,16 +228,19 @@ void Service::_updateEvents() // Step the counters _lineCounter = (_lineCounter + 1) % _linesPerField; + auto t1 = std::chrono::system_clock::now(); + auto duration = t1.time_since_epoch(); + int64_t fields = std::chrono::duration_cast(duration).count() / 20; + + time_t now = fields / 50; + + if (((int64_t)masterClock.seconds * 50 + masterClock.fields) > fields) + std::this_thread::sleep_for(std::chrono::milliseconds(40)); // back off for ≈2 fields to limit output to (less than) 50 fields per second + if (_lineCounter == 0) // new field { _fieldCounter = (_fieldCounter + 1) % 50; - time_t now; - time(&now); - - if (masterClock.seconds > now + FORWARDSBUFFER) // allow vbit2 to run into the future before limiting packet rate - std::this_thread::sleep_for(std::chrono::milliseconds(40)); // back off for ≈2 fields to limit output to (less than) 50 fields per second - if (_fieldCounter == 0) { masterClock.seconds++; // step the master clock before updating debug packet @@ -184,12 +248,12 @@ void Service::_updateEvents() masterClock.fields = _fieldCounter; - _packetDebug->TimeAndField(masterClock, now, false); // update the clocks in debugPacket. + _packetDebug->TimeAndField(masterClock, now, fields%50, false); // update the clocks in debugPacket. if (_fieldCounter == 0) { - // if internal master clock is behind real time, or too far ahead, resynchronise it. - if (masterClock.seconds < now || masterClock.seconds > now + FORWARDSBUFFER + 1) + // if internal master clock is behind real time, or more than 1 second ahead, resynchronise it. + if (masterClock.seconds < now || masterClock.seconds > now + 1) { masterClock.seconds = now; @@ -198,7 +262,7 @@ void Service::_updateEvents() for (int i=0;i<8;i++) _magList[i]->InvalidateCycleTimestamp(); // reset magazine cycle duration calculations - _packetDebug->TimeAndField(masterClock, now, true); // update the clocks in debugPacket. + _packetDebug->TimeAndField(masterClock, now, fields%50, true); // update the clocks in debugPacket. } if (masterClock.seconds%15==0) // TODO: how often do we want to trigger sending special packets? @@ -219,6 +283,8 @@ void Service::_updateEvents() (*iterator)->SetEvent(EVENT_FIELD); } + _packetDebug->SetEvent(EVENT_FIELD); + if (_fieldCounter%10==0) // Packet 830 happens every 200ms. { Event ev=EVENT_P830_FORMAT_1; diff --git a/service.h b/service.h index ace5d72..8d0a5fd 100644 --- a/service.h +++ b/service.h @@ -11,6 +11,7 @@ #include "debug.h" #include "pagelist.h" #include "packetServer.h" +#include "datacastServer.h" #include "packet.h" #include "packetsource.h" #include "packetmag.h" @@ -37,7 +38,7 @@ namespace ttx * @param configure A Configure object with all the settings * @param pageList A pageList object already loaded with pages */ - Service(Configure* configure, vbit::Debug* debug, PageList* pageList, PacketServer* packetServer); + Service(Configure* configure, vbit::Debug* debug, PageList* pageList, PacketServer* packetServer, DatacastServer *datacastServer); ~Service(); @@ -59,6 +60,7 @@ namespace ttx PageList* _pageList; /// Member reference to the pages list vbit::PacketMag** _magList; PacketServer* _packetServer; + DatacastServer* _datacastServer; // Member variables for event management uint16_t _linesPerField; diff --git a/vbit2.cpp b/vbit2.cpp index 4aee32d..d6ba439 100644 --- a/vbit2.cpp +++ b/vbit2.cpp @@ -55,8 +55,9 @@ int main(int argc, char** argv) Configure *configure=new Configure(debug, argc, argv); PageList *pageList=new PageList(configure, debug); PacketServer *packetServer=new PacketServer(configure, debug); + DatacastServer *datacastServer=new DatacastServer(configure, debug); - Service* svc=new Service(configure, debug, pageList, packetServer); // Need to copy the subtitle packet source for Newfor + Service* svc=new Service(configure, debug, pageList, packetServer, datacastServer); // Need to copy the subtitle packet source for Newfor std::thread monitorThread(&FileMonitor::run, FileMonitor(configure, debug, pageList)); std::thread serviceThread(&Service::run, svc); @@ -75,6 +76,13 @@ int main(int argc, char** argv) packetServerThread.detach(); } + if (configure->GetDatacastServerEnabled()) + { + // only start datacast server thread if required + std::thread datacastServerThread(&DatacastServer::run, datacastServer ); + datacastServerThread.detach(); + } + // The threads should never stop, but just in case... monitorThread.join(); serviceThread.join(); diff --git a/vbit2.h b/vbit2.h index 9d7f9c3..05d4778 100644 --- a/vbit2.h +++ b/vbit2.h @@ -10,6 +10,7 @@ #include "filemonitor.h" #include "command.h" #include "packetServer.h" +#include "datacastServer.h" #include "masterClock.h" #ifdef WIN32