diff --git a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp index 4af7e59897..7b28f9140f 100644 --- a/slsReceiverSoftware/src/FrameSynchronizerApp.cpp +++ b/slsReceiverSoftware/src/FrameSynchronizerApp.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include //wait #include #include @@ -66,6 +67,7 @@ void zmq_free (void *data, void *hint) struct Status{ + bool starting = true; bool terminate = false; unsigned long num_receivers; @@ -75,6 +77,7 @@ struct Status{ std::vector headers; std::map > > frames; + std::vector ends; }; @@ -161,20 +164,46 @@ std::set find_keys(const std::map& messages) { + size_t num_messages = messages.size(); + + // Iterate over each message in the vector + for (size_t i = 0; i < num_messages; ++i) { + zmq_msg_t* msg = messages[i]; + + // Determine flags: ZMQ_SNDMORE for all messages except the last + int flags = (i == num_messages - 1) ? 0 : ZMQ_SNDMORE; + + // Send the message part + if (zmq_msg_send(msg, socket, flags) == -1) { + std::cerr << "Error sending message: " << zmq_strerror(zmq_errno()) << std::endl; + return -1; // Return -1 on error + } + } + + return 0; // Return 0 on success +} + void Correlate(Status *stat) { - bool starting = true; - std::vector ports; + void *context = zmq_ctx_new (); + + void *socket = zmq_socket (context, ZMQ_PUSH); + int rc = zmq_bind (socket, "tcp://*:5555"); + if (rc != 0){ + std::cout << "failed to bind"; + } + while (!stat->terminate) { - sem_wait(&(stat->available)); std::cout << "Correlate cache" << std::endl; + sem_wait(&(stat->available)); { std::lock_guard lock(stat->mtx); - if (starting) { + if (stat->starting) { if (stat->headers.size() == stat->num_receivers) { std::cout << "got all start messages" << std::endl; - starting = false; - ports.clear(); - + stat->starting = false; + zmq_send_multipart(socket, stat->headers); + stat->headers.clear(); } } else { @@ -182,12 +211,30 @@ void Correlate(Status *stat) { //print_frames(stat->frames); auto common_keys = find_keys(stat->frames); for (const auto& key : common_keys) { + std::vector parts; + for (const auto& [port, trigger_map] : stat->frames) { + auto it = trigger_map.find(key); + if (it != trigger_map.end()) { + parts.insert(parts.end(), stat->frames[port][key].begin(), stat->frames[port][key].end()); + std::cout << " Key " << key << " found in map " << port << std::endl; + stat->frames[port].erase(key); + } + } std::cout << key << " "; + zmq_send_multipart(socket, parts); } std::cout << "\n\n"; } + if (stat->ends.size() == stat->num_receivers) { + std::cout << "all ends received, flushing" << std::endl; + // clean up all remaining frames + zmq_send_multipart(socket, stat->ends); + stat->ends.clear(); + } } } + zmq_close(socket); + zmq_ctx_destroy(context); } /** @@ -215,7 +262,7 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, std::ostringstream oss; oss << "{\"htype\":\"header\"" - << ", \"udpPort\":" << sls::ToString(callbackHeader.udpPort) + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) << ", \"filePath\":" << callbackHeader.filePath << "\"}\n"; @@ -230,6 +277,7 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader, { std::lock_guard lock(stat->mtx); stat->headers.push_back(hmsg); + stat->starting = true; for(int port: callbackHeader.udpPort) { std::cout << "clear cache for stream" << port << std::endl; for (auto& pair : stat->frames[port]) { @@ -259,6 +307,27 @@ void AcquisitionFinished( << "\n\tLast Frame Index : " << sls::ToString(callbackHeader.lastFrameIndex) << "\n\t]"; + + Status* stat = static_cast(objectPointer); + + std::ostringstream oss; + oss << "{\"htype\":\"series_end\"" + << ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort) + << "\"}\n"; + + std::string message = oss.str(); + int length = message.length(); + char* hdata = new char[length]; + + memcpy(hdata, message.c_str(), length); + zmq_msg_t *hmsg = new zmq_msg_t; + zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL); + + { + std::lock_guard lock(stat->mtx); + stat->ends.push_back(hmsg); + } + sem_post(&stat->available); } /** @@ -428,7 +497,7 @@ int main(int argc, char *argv[]) { cprintf(RED, "Could not set handler function for SIGPIPE\n"); } - Status stat{false, numReceivers}; + Status stat{true, false, numReceivers}; sem_init(&stat.available, 0, 0);