diff --git a/Source/Processors/EventTranslator/EventTranslator.cpp b/Source/Processors/EventTranslator/EventTranslator.cpp index ce11d4f1d2..0cb90f0e42 100644 --- a/Source/Processors/EventTranslator/EventTranslator.cpp +++ b/Source/Processors/EventTranslator/EventTranslator.cpp @@ -121,21 +121,18 @@ void EventTranslator::handleTTLEvent(TTLEventPtr event) const uint16 eventStream = event->getStreamId(); const int ttlLine = event->getLine(); const int64 sampleNumber = event->getSampleNumber(); + const bool state = event->getState(); if (synchronizer.getSyncLine(eventStream) == ttlLine) { - synchronizer.addEvent(eventStream, ttlLine, sampleNumber); + synchronizer.addEvent(eventStream, ttlLine, sampleNumber, state); return; } - if (eventStream == synchronizer.mainStreamId && synchronizer.isStreamSynced(eventStream)) + if (eventStream == synchronizer.mainstreamId && synchronizer.isStreamSynced(eventStream)) { - - //std::cout << "TRANSLATE!" << std::endl; - - const bool state = event->getState(); - + double timestamp = synchronizer.convertSampleNumberToTimestamp(eventStream, sampleNumber); for (auto stream : getDataStreams()) @@ -182,7 +179,7 @@ void EventTranslator::saveCustomParametersToXml(XmlElement* xml) XmlElement* streamXml = xml->createNewChildElement("STREAM"); - streamXml->setAttribute("isMainStream", synchronizer.mainStreamId == streamId); + streamXml->setAttribute("isMainStream", synchronizer.mainstreamId == streamId); streamXml->setAttribute("sync_line", getSyncLine(streamId)); streamXml->setAttribute("name", stream->getName()); streamXml->setAttribute("source_node_id", stream->getSourceNodeId()); diff --git a/Source/Processors/RecordNode/RecordNode.cpp b/Source/Processors/RecordNode/RecordNode.cpp index 0bfcdc0a58..b8ff0321ff 100755 --- a/Source/Processors/RecordNode/RecordNode.cpp +++ b/Source/Processors/RecordNode/RecordNode.cpp @@ -229,11 +229,11 @@ void RecordNode::handleBroadcastMessage(String msg) if (recordEvents && isRecording) { - int64 messageSampleNumber = getFirstSampleNumberForBlock(synchronizer.mainStreamId); + int64 messageSampleNumber = getFirstSampleNumberForBlock(synchronizer.mainstreamId); TextEventPtr event = TextEvent::createTextEvent(getMessageChannel(), messageSampleNumber, msg); - double ts = synchronizer.convertSampleNumberToTimestamp(synchronizer.mainStreamId, messageSampleNumber); + double ts = synchronizer.convertSampleNumberToTimestamp(synchronizer.mainstreamId, messageSampleNumber); event->setTimestampInSeconds(ts); @@ -578,7 +578,7 @@ bool RecordNode::startAcquisition() { eventChannels.add(new EventChannel(*messageChannel)); eventChannels.getLast()->addProcessor(processorInfo.get()); - eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamId), false); + eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainstreamId), false); } return true; @@ -628,7 +628,7 @@ void RecordNode::startRecording() { eventChannels.add(new EventChannel(*messageChannel)); eventChannels.getLast()->addProcessor(processorInfo.get()); - eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainStreamId), false); + eventChannels.getLast()->setDataStream(getDataStream(synchronizer.mainstreamId), false); } int lastSourceNodeId = -1; @@ -747,7 +747,7 @@ void RecordNode::handleTTLEvent(TTLEventPtr event) int64 sampleNumber = event->getSampleNumber(); - synchronizer.addEvent(event->getStreamId(), event->getLine(), sampleNumber); + synchronizer.addEvent(event->getStreamId(), event->getLine(), sampleNumber, event->getState()); if (recordEvents && isRecording) { @@ -1016,7 +1016,7 @@ void RecordNode::saveCustomParametersToXml(XmlElement* xml) { XmlElement* streamXml = xml->createNewChildElement("STREAM"); - streamXml->setAttribute("isMainStream", synchronizer.mainStreamId == streamId); + streamXml->setAttribute("isMainStream", synchronizer.mainstreamId == streamId); streamXml->setAttribute("sync_line", getSyncLine(streamId)); streamXml->setAttribute("name", stream->getName()); streamXml->setAttribute("source_node_id", stream->getSourceNodeId()); diff --git a/Source/Processors/RecordNode/SyncControlButton.cpp b/Source/Processors/RecordNode/SyncControlButton.cpp index 61d452566d..2da17ccfbf 100644 --- a/Source/Processors/RecordNode/SyncControlButton.cpp +++ b/Source/Processors/RecordNode/SyncControlButton.cpp @@ -41,7 +41,7 @@ SyncControlButton::SyncControlButton(SynchronizingProcessor* node_, isPrimary = node->isMainDataStream(streamId); LOGD("SyncControlButton::Constructor; Stream: ", streamId, " is main stream: ", isPrimary); - startTimer(250); + startTimer(500); setTooltip(name); @@ -113,9 +113,11 @@ void SyncControlButton::paintButton(Graphics &g, bool isMouseOver, bool isButton g.fillRoundedRectangle(0,0,getWidth(),getHeight(),4); switch(node->synchronizer.getStatus(streamId)) { - + case SyncStatus::OFF : { + //LOGC("Get status: ", streamId, " SYNC_OFF"); + if (isMouseOver) { //LIGHT GREY @@ -130,6 +132,7 @@ void SyncControlButton::paintButton(Graphics &g, bool isMouseOver, bool isButton } case SyncStatus::SYNCING : { + //LOGC("Get status: ", streamId, " SYNCING"); if (isMouseOver) { @@ -145,6 +148,7 @@ void SyncControlButton::paintButton(Graphics &g, bool isMouseOver, bool isButton } case SyncStatus::SYNCED : { + //LOGC("Get status: ", streamId, " SYNCED"); if (isMouseOver) { diff --git a/Source/Processors/RecordNode/Synchronizer.cpp b/Source/Processors/RecordNode/Synchronizer.cpp index 2cda96db1b..8583517839 100644 --- a/Source/Processors/RecordNode/Synchronizer.cpp +++ b/Source/Processors/RecordNode/Synchronizer.cpp @@ -1,371 +1,470 @@ /* ------------------------------------------------------------------- + ------------------------------------------------------------------ -This file is part of the Open Ephys GUI -Copyright (C) 2022 Open Ephys + This file is part of the Open Ephys GUI + Copyright (C) 2024 Open Ephys ------------------------------------------------------------------- + ------------------------------------------------------------------ -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. -You should have received a copy of the GNU General Public License -along with this program. If not, see . + You should have received a copy of the GNU General Public License + along with this program. If not, see . */ #include "Synchronizer.h" -Stream::Stream(uint16 streamId_, float expectedSampleRate_) - : streamId(streamId_), - expectedSampleRate(expectedSampleRate_), - actualSampleRate(-1.0f), - sampleRateTolerance(0.01f), - isActive(true) +SyncStream::SyncStream(uint16 streamId_, float expectedSampleRate_) + : streamId(streamId_), + expectedSampleRate(expectedSampleRate_), + actualSampleRate(-1.0f), + isActive(true) { +} + +void SyncStream::reset(uint16 mainstreamId) +{ + isMainStream = (streamId == mainstreamId); + + pulses.clear(); + firstMatchingPulse = SyncPulse(); - reset(); + latestSyncSampleNumber = 0; + latestGlobalSyncTime = 0.0; + latestSyncMillis = -1; + + if (isMainStream) + { + actualSampleRate = expectedSampleRate; + globalStartTime = 0.0; + isSynchronized = true; + } + else + { + actualSampleRate = -1.0; + globalStartTime = -1.0; + isSynchronized = false; + } } -void Stream::reset() +void SyncStream::addEvent(int64 sampleNumber, bool state) { + //LOGD ("[+] Adding event for stream ", streamId, " (", sampleNumber, ")"); - startSampleMainTime = -1.0f; - lastSampleMainTime = -1.0f; + if (state) // on event received, pulse initiated + { + SyncPulse latestPulse; + latestPulse.localSampleNumber = sampleNumber; + latestPulse.localTimestamp = sampleNumber / expectedSampleRate; + latestPulse.computerTimeMillis = Time::currentTimeMillis(); - actualSampleRate = -1.0f; - startSample = -1; - lastSample = -1; + pulses.insert(pulses.begin(), latestPulse); + } + else // off event received, pulse terminated + { + if (pulses.size() > 0) + { + SyncPulse& latestPulse = pulses.front(); + + latestPulse.complete = true; + latestPulse.duration = + (sampleNumber / expectedSampleRate) - latestPulse.localTimestamp; + + if (pulses.size() > 1) + { + latestPulse.interval = latestPulse.localTimestamp - pulses[1].localTimestamp; + } + } + + if (pulses.size() > MAX_PULSES_IN_BUFFER) + { + pulses.pop_back(); + } + } +} + +double SyncStream::getLatestSyncTime() +{ + //LOGD ("Getting latest sync time for stream ", streamId, "..."); + //LOGD ("Time::currentTimeMillis(): ", Time::currentTimeMillis()); + //LOGD ("latestSyncMillis: ", latestSyncMillis); - receivedEventInWindow = false; - receivedMainTimeInWindow = false; - isSynchronized = false; + if (latestSyncMillis != -1) + { + // LOGD ("Returning: ", double (Time::currentTimeMillis() - latestSyncMillis) / 1000.0f); + return double(Time::currentTimeMillis() - latestSyncMillis) / 1000.0f; + } + else + { + // LOGD ("Returning: ", -1); + return -1.0; + } } -void Stream::setMainTime(float time) +double SyncStream::getSyncAccuracy() { - if (!receivedMainTimeInWindow) - { - tempMainTime = time; - receivedMainTimeInWindow = true; - //LOGD("Stream ", streamId, " received main time: ", time); + if (pulses.size() > 0) + { + + //LOGD ("Sync accuracy for stream ", streamId); - } - else { // multiple events, something could be wrong - receivedMainTimeInWindow = false; - } + //LOGD ("latestSyncSampleNumber: ", latestSyncSampleNumber); + //LOGD ("latestGlobalSyncTime: ", latestGlobalSyncTime); + //LOGD ("globalStartTime: ", globalStartTime); + //LOGD ("actualSampleRate: ", actualSampleRate); + double estimatedGlobalTime = latestSyncSampleNumber / actualSampleRate + globalStartTime; + //LOGD ("estimatedGlobalTime: ", estimatedGlobalTime); + //LOGD ("difference: ", latestGlobalSyncTime - estimatedGlobalTime); + return (estimatedGlobalTime - latestGlobalSyncTime) * 1000; + } + else + { + return 0.0; + } } -void Stream::addEvent(int64 sampleNumber) +void SyncStream::syncWith(const SyncStream* mainStream) { - //LOGD("[+] Adding event for stream ", streamId, " (", sampleNumber, ")"); - - if (!receivedEventInWindow) - { - tempSampleNum = sampleNumber; - receivedEventInWindow = true; - } - else { // multiple events, something could be wrong - receivedEventInWindow = false; - } + //LOGD ("Synchronizing ", streamId, " with ", mainStream->streamId, "...") + + if (mainStream->pulses.size() < 2 || pulses.size() < 2) + { + //LOGD ("Not enough pulses to synchronize."); + return; + } + + int localIndex = 0; + bool foundMatchingPulse = false; + + for (auto& pulse : pulses) // loop through pulses in this stream + { + if (pulse.complete) + { + int index = 0; + + for (auto& mainPulse : mainStream->pulses) // loop through pulses in main stream + { + if (mainPulse.complete) + { + if (comparePulses(pulse, mainPulse)) // putative match + { + if (pulses.size() > localIndex + 3 && mainStream->pulses.size() > index + 3) + { + if (comparePulses(pulses[localIndex + 1], mainStream->pulses[index + 1]) + && comparePulses(pulses[localIndex + 2], mainStream->pulses[index + 2]) + && comparePulses(pulses[localIndex + 3], mainStream->pulses[index + 3])) + { + pulse.matchingPulseIndex = index; + pulse.globalTimestamp = mainPulse.localTimestamp; + latestSyncSampleNumber = pulse.localSampleNumber; + latestGlobalSyncTime = pulse.globalTimestamp; + latestSyncMillis = pulse.computerTimeMillis; + //LOGD ("Pulse at ", pulse.localTimestamp, " matches with 4 main pulses at ", index); + //LOGD ("latestSyncSampleNumber: ", latestSyncSampleNumber, ", latestGlobalSyncTime: ", latestGlobalSyncTime); + + + if (firstMatchingPulse.complete == false) + { + firstMatchingPulse.localTimestamp = pulse.localTimestamp; + firstMatchingPulse.globalTimestamp = mainPulse.localTimestamp; + firstMatchingPulse.localSampleNumber = pulse.localSampleNumber; + firstMatchingPulse.complete = true; + //LOGD ("Time of first matching pulse: ", firstMatchingPulse.localTimestamp, " (local), ", firstMatchingPulse.globalTimestamp, " (global)"); + } + } + } + + break; + } + } + + index++; + } + } + + if (pulse.matchingPulseIndex != -1) + { + foundMatchingPulse = true; + break; + } + + localIndex++; + } + + if (foundMatchingPulse) + { + if (firstMatchingPulse.complete && (pulses[localIndex].localTimestamp - firstMatchingPulse.localTimestamp) > 1.0) + { + //LOGD ("pulses[localIndex].localSampleNumber: ", pulses[localIndex].localSampleNumber, ", firstMatchingPulse.localSampleNumber: ", firstMatchingPulse.localSampleNumber); + //LOGD ("pulses[localIndex].localTimestamp: ", pulses[localIndex].localTimestamp, ", firstMatchingPulse.localTimestamp: ", firstMatchingPulse.localTimestamp); + //LOGD ("pulses[localIndex].globalTimestamp: ", pulses[localIndex].globalTimestamp, ", firstMatchingPulse.globalTimestamp: ", firstMatchingPulse.globalTimestamp); + + float estimatedActualSampleRate = (pulses[localIndex].localSampleNumber - firstMatchingPulse.localSampleNumber) / (pulses[localIndex].globalTimestamp - firstMatchingPulse.globalTimestamp); + + double estimatedGlobalStartTime = pulses[localIndex].globalTimestamp - pulses[localIndex].localSampleNumber / actualSampleRate; + + if (std::abs(estimatedActualSampleRate - expectedSampleRate) / expectedSampleRate < 0.001) + { + actualSampleRate = estimatedActualSampleRate; + + if (std::abs(estimatedGlobalStartTime) < 0.1) + { + if (!isSynchronized) + + { + globalStartTime = estimatedGlobalStartTime; + isSynchronized = true; + } + } + else + { + //LOGD ("Estimated global start time of ", estimatedGlobalStartTime, " is out of bounds. Ignoring.") + } + } + else + { + //LOGD ("Estimated sample rate of ", estimatedActualSampleRate, " is out of bounds. Ignoring."); + return; + } + + //LOGD ("Stream ", streamId, " synchronized with main stream. Sample rate: ", actualSampleRate, ", start time: ", globalStartTime); + } + else + { + //LOGD ("At least 1 second must elapse before synchronization can be attempted."); + } + } + } -void Stream::closeSyncWindow() +bool SyncStream::comparePulses(const SyncPulse& pulse1, const SyncPulse& pulse2) { + if (std::abs(pulse1.computerTimeMillis - pulse2.computerTimeMillis) < MAX_TIME_DIFFERENCE_MS) + { + if (std::abs(pulse1.duration - pulse2.duration) < MAX_DURATION_DIFFERENCE_MS) + { + if (std::abs(pulse1.interval - pulse2.interval) < MAX_INTERVAL_DIFFERENCE_MS) + { + return true; + } + } + } - //LOGC("Stream ", streamId, " Closing Sync Window...receivedEvent: ", receivedEventInWindow, ", receivedMainTime: ", receivedMainTimeInWindow); - - if (receivedEventInWindow && receivedMainTimeInWindow) - { - if (startSample < 0) - { - startSample = tempSampleNum; - startSampleMainTime = tempMainTime; - } - else { - - lastSample = tempSampleNum; - lastSampleMainTime = tempMainTime; - - double tempSampleRate = (lastSample - startSample) / (lastSampleMainTime - startSampleMainTime); - - if (actualSampleRate < 0.0f) - { - actualSampleRate = tempSampleRate; - isSynchronized = true; - //LOGC("Stream ", streamId, " new sample rate: ", actualSampleRate); - } - else { - // check whether the sample rate has changed - if (abs((tempSampleRate - actualSampleRate) / actualSampleRate) < sampleRateTolerance) - { - actualSampleRate = tempSampleRate; - isSynchronized = true; - //LOGC("Stream ", streamId, " UPDATED sample rate: ", actualSampleRate); - - } - else - { // reset the clock - actualSampleRate = -1.0f; - startSample = tempSampleNum; - startSampleMainTime = tempMainTime; - isSynchronized = false; - //LOGC("Stream ", streamId, " NO LONGER SYNCHRONIZED."); - - } - } - } - } - - //LOGD("[x] Stream ", streamId, " closed sync window."); - - receivedEventInWindow = false; - receivedMainTimeInWindow = false; + return false; } // ======================================================= Synchronizer::Synchronizer() - : syncWindowLengthMs(50), - syncWindowIsOpen(false), - firstMainSyncEvent(false), - mainStreamId(0), - previousMainStreamId(0), - streamCount(0), - acquisitionIsActive(false) { } void Synchronizer::reset() { - - syncWindowIsOpen = false; - firstMainSyncEvent = true; - eventCount = 0; - - if (streamCount == 1) - { - streams[mainStreamId]->actualSampleRate = streams[mainStreamId]->expectedSampleRate; - streams[mainStreamId]->isSynchronized = true; - streams[mainStreamId]->startSampleMainTime = 0.0; - streams[mainStreamId]->startSample = 0; - LOGD("Only one stream, setting as synchronized."); - } else { - for (auto [id, stream] : streams) - stream->reset(); - } - + for (auto [id, stream] : streams) + stream->reset(mainstreamId); } void Synchronizer::prepareForUpdate() { - previousMainStreamId = mainStreamId; - mainStreamId = 0; - streamCount = 0; + previousMainstreamId = mainstreamId; - for (auto [id, stream] : streams) - stream->isActive = false; + streamCount = 0; + + for (auto [id, stream] : streams) + stream->isActive = false; } void Synchronizer::finishedUpdate() { - } - void Synchronizer::addDataStream(uint16 streamId, float expectedSampleRate) { - - //std::cout << "Synchronizer adding " << streamId << std::endl; - // if this is the first stream, make it the main one - if (mainStreamId == 0) - mainStreamId = streamId; - + // std::cout << "Synchronizer adding " << streamId << std::endl; + // if this is the first stream, make it the main one + if (mainstreamId == 0) + mainstreamId = streamId; + //std::cout << "Main stream ID: " << mainStreamId << std::endl; - // if there's a stored value, and it appears again, - // re-instantiate this as the main stream - if (streamId == previousMainStreamId) - mainStreamId = previousMainStreamId; + // if there's a stored value, and it appears again, + // re-instantiate this as the main stream + if (mainstreamId == previousMainstreamId) + mainstreamId = previousMainstreamId; - // if there's no Stream object yet, create a new one - if (streams.count(streamId) == 0) - { + // if there's no Stream object yet, create a new one + if (streams.count(streamId) == 0) + { //std::cout << "Creating new Stream object" << std::endl; - dataStreamObjects.add(new Stream(streamId, expectedSampleRate)); - streams[streamId] = dataStreamObjects.getLast(); - setSyncLine(streamId, 0); - } else { - // otherwise, indicate that the stream is currently active - streams[streamId]->isActive = true; - } - - streamCount++; + dataStreamObjects.add(new SyncStream(streamId, expectedSampleRate)); + streams[streamId] = dataStreamObjects.getLast(); + setSyncLine(streamId, 0); + } + else + { + // otherwise, indicate that the stream is currently active + streams[streamId]->isActive = true; + } + streamCount++; } void Synchronizer::setMainDataStream(uint16 streamId) { - mainStreamId = streamId; - reset(); + mainstreamId = streamId; + reset(); } void Synchronizer::setSyncLine(uint16 streamId, int ttlLine) { - streams[streamId]->syncLine = ttlLine; - - if (streamId == mainStreamId) + streams[streamId]->syncLine = ttlLine; + + if (streamId == mainstreamId) reset(); else - streams[streamId]->reset(); + streams[streamId]->reset(mainstreamId); } int Synchronizer::getSyncLine(uint16 streamId) { - return streams[streamId]->syncLine; + return streams[streamId]->syncLine; } void Synchronizer::startAcquisition() { - acquisitionIsActive = true; - + + LOGC("Synchronizer starting acquisition..."); + reset(); + + acquisitionIsActive = true; + + startTimer(1000); } void Synchronizer::stopAcquisition() { acquisitionIsActive = false; + + stopTimer(); } -void Synchronizer::addEvent(uint16 streamId, int ttlLine, int64 sampleNumber) +void Synchronizer::addEvent(uint16 streamId, + int ttlLine, + int64 sampleNumber, + bool state) { + const ScopedLock sl(synchronizerLock); - if (streamCount == 1 || sampleNumber < 1000) - return; - - //LOGC("Synchronizer received sync event for stream ", streamId, ", sampleNumber: ", sampleNumber); - - if (streams[streamId]->syncLine == ttlLine) - { - - //LOGC("Correct line!"); - - if (!syncWindowIsOpen) - { - openSyncWindow(); - } - - streams[streamId]->addEvent(sampleNumber); - - if (streamId == mainStreamId) - { + if (streamCount == 1 || sampleNumber < 1000) + return; - float mainTimeSec; - - if (!firstMainSyncEvent) - { - mainTimeSec = (sampleNumber - streams[streamId]->startSample) / streams[streamId]->expectedSampleRate; - } - else - { - mainTimeSec = 0.0f; - firstMainSyncEvent = false; - } - - for (auto [id, stream] : streams) - { - if (stream->isActive) - stream->setMainTime(mainTimeSec); - } - - //LOGC("[M] Main time: ", mainTimeSec); - - eventCount++; - } - - //LOGC("[T] Estimated time: ", convertSampleNumberToTimestamp(streamId, sampleNumber)); - //LOGC("[S] Is synchronized: ", streams[streamId]->isSynchronized); - //LOGC(" "); - - } + if (streams[streamId]->syncLine == ttlLine) + { + streams[streamId]->addEvent(sampleNumber, state); + } } double Synchronizer::convertSampleNumberToTimestamp(uint16 streamId, int64 sampleNumber) { - - if (streams[streamId]->isSynchronized) - { - return (double)(sampleNumber - streams[streamId]->startSample) / - streams[streamId]->actualSampleRate + - streams[streamId]->startSampleMainTime; - } - else { - return (double) -1.0f; - } + if (streams[streamId]->isSynchronized) + { + return (double)sampleNumber / streams[streamId]->actualSampleRate + streams[streamId]->globalStartTime; + } + else + { + return (double)-1.0f; + } } int64 Synchronizer::convertTimestampToSampleNumber(uint16 streamId, double timestamp) { - if (streams[streamId]->isSynchronized) { - double t = (timestamp - streams[streamId]->startSampleMainTime) * streams[streamId]->actualSampleRate + streams[streamId]->startSample; - - return (int64) t; + double t = (timestamp - streams[streamId]->globalStartTime) * streams[streamId]->actualSampleRate; + + return (int64)t; } - else { + else + { return -1; } } -void Synchronizer::openSyncWindow() +double Synchronizer::getStartTime(uint16 streamId) { - startTimer(syncWindowLengthMs); + return streams[streamId]->globalStartTime * 1000; +} + +double Synchronizer::getLastSyncEvent(uint16 streamId) +{ + return streams[streamId]->getLatestSyncTime(); +} + +double Synchronizer::getAccuracy(uint16 streamId) +{ + + if (!streams[streamId]->isSynchronized) + return 0.0; + else + { + if (streamId == mainstreamId) + return 0.0; + else + { + return streams[streamId]->getSyncAccuracy(); + } + + } - syncWindowIsOpen = true; } bool Synchronizer::isStreamSynced(uint16 streamId) { - return streams[streamId]->isSynchronized; + return streams[streamId]->isSynchronized; } SyncStatus Synchronizer::getStatus(uint16 streamId) { - if (streamId <= 0 || !acquisitionIsActive) + if (streamId <= 0 || !acquisitionIsActive) + { return SyncStatus::OFF; - - if (isStreamSynced(streamId)) - return SyncStatus::SYNCED; - else - return SyncStatus::SYNCING; - + } + + if (isStreamSynced(streamId)) + return SyncStatus::SYNCED; + else + return SyncStatus::SYNCING; } void Synchronizer::hiResTimerCallback() { - stopTimer(); - - syncWindowIsOpen = false; - for (auto [id, stream] : streams) - stream->closeSyncWindow(); + const ScopedLock sl(synchronizerLock); - //LOGD(" "); + for (auto [key, stream] : streams) + { + if (key != mainstreamId) + { + stream->syncWith(streams[mainstreamId]); + } + } } - // called by RecordNodeEditor (when loading), SyncControlButton void SynchronizingProcessor::setMainDataStream(uint16 streamId) { @@ -388,5 +487,5 @@ int SynchronizingProcessor::getSyncLine(uint16 streamId) // called by SyncControlButton bool SynchronizingProcessor::isMainDataStream(uint16 streamId) { - return (streamId == synchronizer.mainStreamId); + return (streamId == synchronizer.mainstreamId); } diff --git a/Source/Processors/RecordNode/Synchronizer.h b/Source/Processors/RecordNode/Synchronizer.h index e9b329d77c..a671510738 100644 --- a/Source/Processors/RecordNode/Synchronizer.h +++ b/Source/Processors/RecordNode/Synchronizer.h @@ -35,32 +35,75 @@ along with this program. If not, see . +/** + + Represents a single sync pulse + +*/ +struct SyncPulse +{ + /** The time (in seconds) since the start of acquisition + for the pulse's stream + */ + double localTimestamp; + + /** The sample number at which this event occurred */ + int64 localSampleNumber; + + /** The computer clock time at which this event was received + by the synchronizer */ + int64 computerTimeMillis; + + /** Whether the whole pulse has completed (on/off sequence) */ + bool complete = false; + + /** Duration of the event in seconds */ + double duration = -1.0; + + /** Time between the start of this event and the start of the last event */ + double interval = -1.0; + + /** Index of matching pulse in main stream */ + int matchingPulseIndex = -1; + + /** Global timestamp of pulse (if known) */ + double globalTimestamp = -1.0; +}; + /** * * Represents an incoming data stream * * */ -class Stream +class SyncStream { public: - /** Constructor */ - Stream(uint16 streamId, float expectedSampleRate); + SyncStream(uint16 streamId, float expectedSampleRate); /** Resets stream parameters before acquisition */ - void reset(); + void reset(uint16 streamId); + + /** True if this is the main stream */ + bool isMainStream; - /** Adds a sync event with a particular sample number*/ - void addEvent(int64 sampleNumber); + /** Adds a sync event with a particular sample number and state*/ + void addEvent(int64 sampleNumber, bool state); - /** Sets the main clock time for the last event */ - void setMainTime(float time); + /** Global start time of this stream */ + double globalStartTime; - /** Opens sync window (when event is received on any sync line) */ - void openSyncWindow(); + /** Returns time of latest sync pulse */ + double getLatestSyncTime(); - /** Closes sync window (after a delay) */ - void closeSyncWindow(); + /** Returns difference between actual and expected sync times */ + double getSyncAccuracy(); + + /** Synchronize this stream with another one */ + void syncWith(const SyncStream* mainStream); + + /** Compares pulses; returns true if a match is found */ + bool comparePulses(const SyncPulse& pulse1, const SyncPulse& pulse2); /** Stated sample rate for this stream */ float expectedSampleRate; @@ -68,54 +111,52 @@ class Stream /** Computed sample rate for this stream */ float actualSampleRate; - /** Sample index to which all future events are relative to*/ - int64 startSample; - - /** Sample index of most recent event */ - int64 lastSample; - /** TTL line to use for synchronization */ int syncLine; - /** true if this stream is succesfully synchronized */ + /** true if this stream is successfully synchronized */ bool isSynchronized; - /** true if a sync event was received in the latest window*/ - bool receivedEventInWindow; + /** Holds the unique key for this stream */ + uint16 streamId; + + /** true if the stream is in active use */ + bool isActive; - /** true if a main stream sync event was received in the latest window*/ - bool receivedMainTimeInWindow; + /** The sync pulses for this stream - /** Stores the latest sample number until the sync window is closed*/ - int64 tempSampleNum; + The latest pulse is added to the beginning of the vector + Expired pulses are removed from the end + */ + std::vector pulses; - /** Stores the latest main time until the sync window is closed */ - float tempMainTime; + /** First pulse matching the global stream */ + SyncPulse firstMatchingPulse; - /** Holds the main time of the start sample */ - float startSampleMainTime = -1.0f; + /** Determines the maximum size of the sync pulse buffer */ + const int MAX_PULSES_IN_BUFFER = 10; - /** Holds the main time of the last sample*/ - float lastSampleMainTime = -1.0f; + /** Threshold for calling pulses synchronous */ + const int MAX_TIME_DIFFERENCE_MS = 50; - /** If the sample rate changes by more than this amount, - * consider the stream desynchronized */ - float sampleRateTolerance; + /** Threshold of calling durations equal */ + const double MAX_DURATION_DIFFERENCE_MS = 2; - /** Holds the ID for this stream */ - uint16 streamId; + /** Threshold of calling intervals equal */ + const double MAX_INTERVAL_DIFFERENCE_MS = 2; - /** true if the stream is in active use */ - bool isActive; +private: + int64 latestSyncSampleNumber = 0; + double latestGlobalSyncTime = 0.0; + int64 latestSyncMillis = -1; }; -class RecordNode; - -enum SyncStatus { - OFF, //Synchronizer is not running - SYNCING, //Synchronizer is attempting to sync - SYNCED //Signal has been synchronized +enum SyncStatus +{ + OFF, //Synchronizer is not running + SYNCING, //Synchronizer is attempting to sync + SYNCED //Signal has been synchronized }; /** @@ -138,19 +179,27 @@ enum SyncStatus { class Synchronizer : public HighResolutionTimer { public: - /** Constructor*/ Synchronizer(); /** Destructor */ - ~Synchronizer() { } + ~Synchronizer() {} /** Converts an int64 sample number to a double timestamp */ double convertSampleNumberToTimestamp(uint16 streamId, int64 sampleNumber); - + /** Converts a double timestamp to an int64 sample number */ int64 convertTimestampToSampleNumber(uint16 streamId, double timestamp); + /** Returns offset (relative start time) for stream in ms */ + double getStartTime(uint16 streamId); + + /** Get latest sync time */ + double getLastSyncEvent(uint16 streamId); + + /** Get the accuracy of synchronization (difference between expected and actual event time) */ + double getAccuracy(uint16 streamId); + /** Resets all values when acquisition is re-started */ void reset(); @@ -179,45 +228,38 @@ class Synchronizer : public HighResolutionTimer /** Returns the status (OFF / SYNCING / SYNCED) of a given stream*/ SyncStatus getStatus(uint16 streamId); - /** Checks an event for a stream ID / line combination */ - void addEvent(uint16 streamId, int ttlLine, int64 sampleNumber); - + /** Adds an event for a stream ID / line combination */ + void addEvent(uint16 streamId, int ttlLine, int64 sampleNumber, bool state); + /** Signals start of acquisition */ void startAcquisition(); - + /** Signals start of acquisition */ void stopAcquisition(); - uint16 mainStreamId = 0; - uint16 previousMainStreamId = 0; - - /** Total number of streams*/ - int streamCount; + uint16 mainstreamId = 0; + uint16 previousMainstreamId; - bool isAvailable() { return mainStreamId > 0; }; + /** Total number of streams*/ + int streamCount = 0; private: - int eventCount = 0; - - float syncWindowLengthMs; - bool syncWindowIsOpen; - bool acquisitionIsActive; + bool acquisitionIsActive = false; void hiResTimerCallback(); - bool firstMainSyncEvent; + CriticalSection synchronizerLock; - std::map streams; - OwnedArray dataStreamObjects; + std::map streams; + OwnedArray dataStreamObjects; - void openSyncWindow(); }; /** - + Abstract base class for Record Node and Event Translator - + */ class SynchronizingProcessor { @@ -225,7 +267,7 @@ class SynchronizingProcessor /** Sets the main data stream to use for synchronization */ void setMainDataStream(uint16 streamId); - /** Returns true if a stream ID matches the one to use for sychronization*/ + /** Returns true if a stream ID matches the one to use for synchronization*/ bool isMainDataStream(uint16 streamId); /** Sets the TTL line to use for synchronization*/ @@ -233,7 +275,7 @@ class SynchronizingProcessor /** Returns the TTL line to use for synchronization*/ int getSyncLine(uint16 streamId); - + /** The synchronizer for this processor */ Synchronizer synchronizer; };