Skip to content

Commit

Permalink
WIP of synchronisation
Browse files Browse the repository at this point in the history
  • Loading branch information
felix-engelmann committed Sep 12, 2024
1 parent 6715243 commit e1c780c
Showing 1 changed file with 237 additions and 8 deletions.
245 changes: 237 additions & 8 deletions slsReceiverSoftware/src/FrameSynchronizerApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
#include "sls/sls_detector_defs.h"

#include <csignal> //SIGINT
#include <cstdio>
#include <cstring>
#include <iostream>
#include <ostream>
#include <semaphore.h>
#include <sys/wait.h> //wait
#include <thread>
#include <mutex>
#include <unistd.h>

#include <vector>
#include <set>
#include <zmq.h>

// gettid added in glibc 2.30
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30
#include <sys/syscall.h>
Expand Down Expand Up @@ -52,6 +59,137 @@ std::string getHelpMessage() {
"for debugging), 0 for none (default)]\n\n");
}

void zmq_free (void *data, void *hint)
{
free (data);
}


struct Status{
bool terminate = false;
unsigned long num_receivers;

sem_t available;

std::mutex mtx;

std::vector<zmq_msg_t*> headers;
std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > > frames;

};

void print_frames(const std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > > &frames) {
for (const auto& outer_pair : frames) {
unsigned int udpPort = outer_pair.first;
const auto& trigger_map = outer_pair.second;

std::cout << "UDP port: " << udpPort << std::endl;

for (const auto& inner_pair : trigger_map) {
long unsigned int acqIndex = inner_pair.first;
const auto& msg_vector = inner_pair.second;

std::cout << " acq index: " << acqIndex << std::endl;
std::cout << " zmq_msg_t* Vector: ";

// Iterate over the vector of zmq_msg_t* and print each message pointer
for (const auto& msg : msg_vector) {
std::cout << " a frame " << msg ; // Print a space between each pointer
}

std::cout << std::endl;
}
}
}

std::set<long unsigned int> find_keys(const std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > >& maps) {
std::set<long unsigned int> all_keys; // Set to collect all unique keys across all maps
std::set<long unsigned int> valid_keys; // Set to store final valid keys

// If no maps are provided, return empty set
if (maps.empty()) {
return valid_keys;
}

// Collect all unique keys from all maps
std::cout << "Collecting all unique keys from the maps:\n";
for (const auto& [port, trigger_map] : maps) {
std::cout << "Map " << port << ": ";
for (const auto& [idx, msgs] : trigger_map) {
all_keys.insert(idx);
std::cout << idx << " ";
}
std::cout << std::endl;
}

std::cout << "All unique keys collected: ";
for (const auto& key : all_keys) {
std::cout << key << " ";
}
std::cout << "\n\n";

// Now check each key against all maps
for (const auto& key : all_keys) {
std::cout << "Checking key: " << key << std::endl;
bool is_valid = true;
for (const auto& [port, map] : maps) {
auto it = map.find(key);
if (it != map.end()) {
std::cout << " Key " << key << " found in map " << port << std::endl;
} else {
// Key is missing, check if the map has a larger key
std::cout << " Key " << key << " missing in map " << port;
auto upper_it = map.upper_bound(key);
if (upper_it != map.end()) {
std::cout << ", but found larger key: " << upper_it->first << std::endl;
} else {
std::cout << ", no larger key found. Key " << key << " is invalid.\n";
is_valid = false;
break;
}
}
}

if (is_valid) {
std::cout << " Key " << key << " is valid.\n\n";
valid_keys.insert(key);
} else {
std::cout << " Key " << key << " is not valid.\n\n";
}
}

return valid_keys;
}

void Correlate(Status *stat) {
bool starting = true;
std::vector<unsigned int> ports;
while (!stat->terminate) {
sem_wait(&(stat->available));
std::cout << "Correlate cache" << std::endl;
{
std::lock_guard<std::mutex> lock(stat->mtx);
if (starting) {
if (stat->headers.size() == stat->num_receivers) {
std::cout << "got all start messages" << std::endl;
starting = false;
ports.clear();

}
}
else {
std::cout << "sending data, common keys" << std::endl;
//print_frames(stat->frames);
auto common_keys = find_keys(stat->frames);
for (const auto& key : common_keys) {
std::cout << key << " ";
}
std::cout << "\n\n";
}
}
}
}

/**
* Start Acquisition Call back (slsFrameSynchronizer writes data if file write
* enabled) if registerCallBackRawDataReady or
Expand All @@ -73,6 +211,38 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
<< "\n\tFile Index : " << callbackHeader.fileIndex
<< "\n\tQuad Enable : " << callbackHeader.quad
<< "\n\t]";
Status* stat = static_cast<Status*>(objectPointer);

std::ostringstream oss;
oss << "{\"htype\":\"header\""
<< ", \"udpPort\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"filePath\":" << callbackHeader.filePath
<< "\"}\n";

std::string message = oss.str();
int length = message.length();
char* hdata = new char[length];

memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL);

{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->headers.push_back(hmsg);
for(int port: callbackHeader.udpPort) {
std::cout << "clear cache for stream" << port << std::endl;
for (auto& pair : stat->frames[port]) {
std::cout << "clear data" << pair.first << std::endl;
for (auto msg : pair.second) {
zmq_msg_close(msg);
free(msg);
}
}
stat->frames[port].clear();
}
}
sem_post(&stat->available);
return 0;
}

Expand Down Expand Up @@ -149,8 +319,55 @@ void GetData(slsDetectorDefs::sls_receiver_header &header,
// header->packetsMask.to_string().c_str(),
((uint8_t)(*((uint8_t *)(dataPointer)))), imageSize);

Status* stat = static_cast<Status*>(objectPointer);

std::ostringstream oss;
oss << "{\"htype\":\"module\""
<< ", \"port\":" << callbackHeader.udpPort
<< "\"}\n";

std::string message = oss.str();
int length = message.length();

char* hdata = new char[length];

memcpy(hdata, message.c_str(), length);
std::cout << callbackHeader.udpPort << ":creating json part" << std::endl;
zmq_msg_t *hmsg = new zmq_msg_t;

zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL);

std::cout << callbackHeader.udpPort << "created header frame" << std::endl;
//zmq_msg_init_buffer (&hmsg, message, length);

char* data = new char[imageSize];

std::cout << callbackHeader.udpPort << "allocated new buffer" << std::endl;

//printf("data pointer %x, data %x\n", dataPointer, )
memcpy(data, dataPointer, imageSize);

std::cout << callbackHeader.udpPort << "copied buffer" << std::endl;
zmq_msg_t *msg = new zmq_msg_t;
zmq_msg_init_data (msg, data, imageSize, zmq_free, NULL);

std::cout << callbackHeader.udpPort << "copied data to data frame" << std::endl;

//std::tuple<zmq_msg_t *, zmq_msg_t *> msgTuple(&hmsg, &msg);

{
std::cout << callbackHeader.udpPort << "getting lock" << std::endl;
std::lock_guard<std::mutex> lock(stat->mtx);
//stat->cache[0][(long unsigned int)42] = nullptr;
std::cout << callbackHeader.udpPort << "put data in cache" << std::endl;
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(hmsg);
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(msg);
}
std::cout << callbackHeader.udpPort << "call not correlate" << std::endl;
sem_post(&stat->available);

// if data is modified, eg ROI and size is reduced
imageSize = 26000;
//imageSize = 26000;
}

/**
Expand Down Expand Up @@ -211,22 +428,28 @@ int main(int argc, char *argv[]) {
cprintf(RED, "Could not set handler function for SIGPIPE\n");
}

Status stat{false, numReceivers};

sem_init(&stat.available, 0, 0);

void* user_data = static_cast<void *>(&stat);

std::thread combinerThread(Correlate, &stat);

/** - loop over number of receivers */
for (int i = 0; i != numReceivers; ++i) {

sem_t *semaphore = new sem_t;
sem_init(semaphore, 1, 0);
semaphores.push_back(semaphore);
threads.emplace_back([semaphore, i, startTCPPort, withCallback,
numReceivers]() {
numReceivers, user_data]() {
sls::Receiver receiver(startTCPPort + i);
if (withCallback) {
receiver.registerCallBackStartAcquisition(StartAcq, nullptr);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinished, nullptr);
receiver.registerCallBackRawDataReady(GetData, nullptr);
}

receiver.registerCallBackStartAcquisition(StartAcq, user_data);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinished, user_data);
receiver.registerCallBackRawDataReady(GetData, user_data);
/** - as long as no Ctrl+C */
sem_wait(semaphore);
sem_destroy(semaphore);
Expand All @@ -237,6 +460,12 @@ int main(int argc, char *argv[]) {
thread.join();
}

std::cout << "Terminate Combiner" << std::endl;
stat.terminate = true;
sem_post(&stat.available);
combinerThread.join();
sem_destroy(&stat.available);

std::cout << "Goodbye!\n";
return 0;
}

0 comments on commit e1c780c

Please sign in to comment.