diff --git a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp index 8a56c1d492..4af7e59897 100644 --- a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp +++ b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp @@ -10,13 +10,20 @@ #include "sls/sls_detector_defs.h" #include //SIGINT +#include #include #include +#include #include #include //wait #include +#include #include +#include +#include +#include + // gettid added in glibc 2.30 #if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30 #include @@ -52,6 +59,137 @@ std::string getHelpMessage() { "for debugging), 0 for none (default)]\n\n"); } +void zmq_free (void *data, void *hint) +{ + free (data); +} + + +struct Status{ + bool terminate = false; + unsigned long num_receivers; + + sem_t available; + + std::mutex mtx; + + std::vector headers; + std::map > > frames; + +}; + +void print_frames(const std::map > > &frames) { + for (const auto& outer_pair : frames) { + unsigned int udpPort = outer_pair.first; + const auto& trigger_map = outer_pair.second; + + std::cout << "UDP port: " << udpPort << std::endl; + + for (const auto& inner_pair : trigger_map) { + long unsigned int acqIndex = inner_pair.first; + const auto& msg_vector = inner_pair.second; + + std::cout << " acq index: " << acqIndex << std::endl; + std::cout << " zmq_msg_t* Vector: "; + + // Iterate over the vector of zmq_msg_t* and print each message pointer + for (const auto& msg : msg_vector) { + std::cout << " a frame " << msg ; // Print a space between each pointer + } + + std::cout << std::endl; + } + } +} + +std::set find_keys(const std::map > >& maps) { + std::set all_keys; // Set to collect all unique keys across all maps + std::set valid_keys; // Set to store final valid keys + + // If no maps are provided, return empty set + if (maps.empty()) { + return valid_keys; + } + + // Collect all unique keys from all maps + std::cout << "Collecting all unique keys from the maps:\n"; + for (const auto& [port, trigger_map] : maps) { + std::cout << "Map " << port << ": "; + for (const auto& [idx, msgs] : trigger_map) { + all_keys.insert(idx); + std::cout << idx << " "; + } + std::cout << std::endl; + } + + std::cout << "All unique keys collected: "; + for (const auto& key : all_keys) { + std::cout << key << " "; + } + std::cout << "\n\n"; + + // Now check each key against all maps + for (const auto& key : all_keys) { + std::cout << "Checking key: " << key << std::endl; + bool is_valid = true; + for (const auto& [port, map] : maps) { + auto it = map.find(key); + if (it != map.end()) { + std::cout << " Key " << key << " found in map " << port << std::endl; + } else { + // Key is missing, check if the map has a larger key + std::cout << " Key " << key << " missing in map " << port; + auto upper_it = map.upper_bound(key); + if (upper_it != map.end()) { + std::cout << ", but found larger key: " << upper_it->first << std::endl; + } else { + std::cout << ", no larger key found. Key " << key << " is invalid.\n"; + is_valid = false; + break; + } + } + } + + if (is_valid) { + std::cout << " Key " << key << " is valid.\n\n"; + valid_keys.insert(key); + } else { + std::cout << " Key " << key << " is not valid.\n\n"; + } + } + + return valid_keys; +} + +void Correlate(Status *stat) { + bool starting = true; + std::vector ports; + while (!stat->terminate) { + sem_wait(&(stat->available)); + std::cout << "Correlate cache" << std::endl; + { + std::lock_guard lock(stat->mtx); + if (starting) { + if (stat->headers.size() == stat->num_receivers) { + std::cout << "got all start messages" << std::endl; + starting = false; + ports.clear(); + + } + } + else { + std::cout << "sending data, common keys" << std::endl; + //print_frames(stat->frames); + auto common_keys = find_keys(stat->frames); + for (const auto& key : common_keys) { + std::cout << key << " "; + } + std::cout << "\n\n"; + } + } + } +} + /** * Start Acquisition Call back (slsFrameSynchronizer writes data if file write * enabled) if registerCallBackRawDataReady or @@ -73,6 +211,38 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, << "\n\tFile Index : " << callbackHeader.fileIndex << "\n\tQuad Enable : " << callbackHeader.quad << "\n\t]"; + Status* stat = static_cast(objectPointer); + + std::ostringstream oss; + oss << "{\"htype\":\"header\"" + << ", \"udpPort\":" << sls::ToString(callbackHeader.udpPort) + << ", \"filePath\":" << callbackHeader.filePath + << "\"}\n"; + + std::string message = oss.str(); + int length = message.length(); + char* hdata = new char[length]; + + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL); + + { + std::lock_guard lock(stat->mtx); + stat->headers.push_back(hmsg); + for(int port: callbackHeader.udpPort) { + std::cout << "clear cache for stream" << port << std::endl; + for (auto& pair : stat->frames[port]) { + std::cout << "clear data" << pair.first << std::endl; + for (auto msg : pair.second) { + zmq_msg_close(msg); + free(msg); + } + } + stat->frames[port].clear(); + } + } + sem_post(&stat->available); return 0; } @@ -149,8 +319,55 @@ void GetData(slsDetectorDefs::sls_receiver_header &header, // header->packetsMask.to_string().c_str(), ((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize); + Status* stat = static_cast(objectPointer); + + std::ostringstream oss; + oss << "{\"htype\":\"module\"" + << ", \"port\":" << callbackHeader.udpPort + << "\"}\n"; + + std::string message = oss.str(); + int length = message.length(); + + char* hdata = new char[length]; + + memcpy(hdata, message.c_str(), length); + std::cout << callbackHeader.udpPort << ":creating json part" << std::endl; + zmq_msg_t *hmsg = new zmq_msg_t; + + zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL); + + std::cout << callbackHeader.udpPort << "created header frame" << std::endl; + //zmq_msg_init_buffer (&hmsg, message, length); + + char* data = new char[imageSize]; + + std::cout << callbackHeader.udpPort << "allocated new buffer" << std::endl; + + //printf("data pointer %x, data %x\n", dataPointer, ) + memcpy(data, dataPointer, imageSize); + + std::cout << callbackHeader.udpPort << "copied buffer" << std::endl; + zmq_msg_t *msg = new zmq_msg_t; + zmq_msg_init_data (msg, data, imageSize, zmq_free, NULL); + + std::cout << callbackHeader.udpPort << "copied data to data frame" << std::endl; + + //std::tuple msgTuple(&hmsg, &msg); + + { + std::cout << callbackHeader.udpPort << "getting lock" << std::endl; + std::lock_guard lock(stat->mtx); + //stat->cache[0][(long unsigned int)42] = nullptr; + std::cout << callbackHeader.udpPort << "put data in cache" << std::endl; + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(hmsg); + stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(msg); + } + std::cout << callbackHeader.udpPort << "call not correlate" << std::endl; + sem_post(&stat->available); + // if data is modified, eg ROI and size is reduced - imageSize = 26000; + //imageSize = 26000; } /** @@ -211,6 +428,14 @@ int main(int argc, char *argv[]) { cprintf(RED, "Could not set handler function for SIGPIPE\n"); } + Status stat{false, numReceivers}; + + sem_init(&stat.available, 0, 0); + + void* user_data = static_cast(&stat); + + std::thread combinerThread(Correlate, &stat); + /** - loop over number of receivers */ for (int i = 0; i != numReceivers; ++i) { @@ -218,15 +443,13 @@ int main(int argc, char *argv[]) { sem_init(semaphore, 1, 0); semaphores.push_back(semaphore); threads.emplace_back([semaphore, i, startTCPPort, withCallback, - numReceivers]() { + numReceivers, user_data]() { sls::Receiver receiver(startTCPPort + i); - if (withCallback) { - receiver.registerCallBackStartAcquisition(StartAcq, nullptr); - receiver.registerCallBackAcquisitionFinished( - AcquisitionFinished, nullptr); - receiver.registerCallBackRawDataReady(GetData, nullptr); - } + receiver.registerCallBackStartAcquisition(StartAcq, user_data); + receiver.registerCallBackAcquisitionFinished( + AcquisitionFinished, user_data); + receiver.registerCallBackRawDataReady(GetData, user_data); /** - as long as no Ctrl+C */ sem_wait(semaphore); sem_destroy(semaphore); @@ -237,6 +460,12 @@ int main(int argc, char *argv[]) { thread.join(); } + std::cout << "Terminate Combiner" << std::endl; + stat.terminate = true; + sem_post(&stat.available); + combinerThread.join(); + sem_destroy(&stat.available); + std::cout << "Goodbye!\n"; return 0; }