Skip to content

Commit

Permalink
WIP of synchronisation
Browse files Browse the repository at this point in the history
  • Loading branch information
felix-engelmann committed Sep 12, 2024
1 parent 5f00db7 commit e9e1cb7
Showing 1 changed file with 252 additions and 22 deletions.
274 changes: 252 additions & 22 deletions slsReceiverSoftware/src/FrameSynchronizerApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
#include "sls/sls_detector_defs.h"

#include <csignal> //SIGINT
#include <cstdio>
#include <cstring>
#include <iostream>
#include <ostream>
#include <semaphore.h>
#include <sys/wait.h> //wait
#include <thread>
#include <mutex>
#include <unistd.h>

#include <vector>
#include <set>
#include <zmq.h>

// gettid added in glibc 2.30
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30
#include <sys/syscall.h>
Expand Down Expand Up @@ -52,6 +59,137 @@ std::string getHelpMessage() {
"for debugging), 0 for none (default)]\n\n");
}

void zmq_free (void *data, void *hint)
{
free (data);
}


struct Status{
bool terminate = false;
unsigned long num_receivers;

sem_t available;

std::mutex mtx;

std::vector<zmq_msg_t*> headers;
std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > > frames;

};

void print_frames(const std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > > &frames) {
for (const auto& outer_pair : frames) {
unsigned int udpPort = outer_pair.first;
const auto& trigger_map = outer_pair.second;

std::cout << "UDP port: " << udpPort << std::endl;

for (const auto& inner_pair : trigger_map) {
long unsigned int acqIndex = inner_pair.first;
const auto& msg_vector = inner_pair.second;

std::cout << " acq index: " << acqIndex << std::endl;
std::cout << " zmq_msg_t* Vector: ";

// Iterate over the vector of zmq_msg_t* and print each message pointer
for (const auto& msg : msg_vector) {
std::cout << " a frame " << msg ; // Print a space between each pointer
}

std::cout << std::endl;
}
}
}

std::set<long unsigned int> find_keys(const std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > >& maps) {
std::set<long unsigned int> all_keys; // Set to collect all unique keys across all maps
std::set<long unsigned int> valid_keys; // Set to store final valid keys

// If no maps are provided, return empty set
if (maps.empty()) {
return valid_keys;
}

// Collect all unique keys from all maps
std::cout << "Collecting all unique keys from the maps:\n";
for (const auto& [port, trigger_map] : maps) {
std::cout << "Map " << port << ": ";
for (const auto& [idx, msgs] : trigger_map) {
all_keys.insert(idx);
std::cout << idx << " ";
}
std::cout << std::endl;
}

std::cout << "All unique keys collected: ";
for (const auto& key : all_keys) {
std::cout << key << " ";
}
std::cout << "\n\n";

// Now check each key against all maps
for (const auto& key : all_keys) {
std::cout << "Checking key: " << key << std::endl;
bool is_valid = true;
for (const auto& [port, map] : maps) {
auto it = map.find(key);
if (it != map.end()) {
std::cout << " Key " << key << " found in map " << port << std::endl;
} else {
// Key is missing, check if the map has a larger key
std::cout << " Key " << key << " missing in map " << port;
auto upper_it = map.upper_bound(key);
if (upper_it != map.end()) {
std::cout << ", but found larger key: " << upper_it->first << std::endl;
} else {
std::cout << ", no larger key found. Key " << key << " is invalid.\n";
is_valid = false;
break;
}
}
}

if (is_valid) {
std::cout << " Key " << key << " is valid.\n\n";
valid_keys.insert(key);
} else {
std::cout << " Key " << key << " is not valid.\n\n";
}
}

return valid_keys;
}

void Correlate(Status *stat) {
bool starting = true;
std::vector<unsigned int> ports;
while (!stat->terminate) {
sem_wait(&(stat->available));
std::cout << "Correlate cache" << std::endl;
{
std::lock_guard<std::mutex> lock(stat->mtx);
if (starting) {
if (stat->headers.size() == stat->num_receivers) {
std::cout << "got all start messages" << std::endl;
starting = false;
ports.clear();

}
}
else {
std::cout << "sending data, common keys" << std::endl;
//print_frames(stat->frames);
auto common_keys = find_keys(stat->frames);
for (const auto& key : common_keys) {
std::cout << key << " ";
}
std::cout << "\n\n";
}
}
}
}

/**
* Start Acquisition Call back (slsFrameSynchronizer writes data if file write
* enabled) if registerCallBackRawDataReady or
Expand All @@ -73,6 +211,38 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
<< "\n\tFile Index : " << callbackHeader.fileIndex
<< "\n\tQuad Enable : " << callbackHeader.quad
<< "\n\t]";
Status* stat = static_cast<Status*>(objectPointer);

std::ostringstream oss;
oss << "{\"htype\":\"header\""
<< ", \"udpPort\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"filePath\":" << callbackHeader.filePath
<< "\"}\n";

std::string message = oss.str();
int length = message.length();
char* hdata = new char[length];

memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL);

{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->headers.push_back(hmsg);
for(int port: callbackHeader.udpPort) {
std::cout << "clear cache for stream" << port << std::endl;
for (auto& pair : stat->frames[port]) {
std::cout << "clear data" << pair.first << std::endl;
for (auto msg : pair.second) {
zmq_msg_close(msg);
free(msg);
}
}
stat->frames[port].clear();
}
}
sem_post(&stat->available);
return 0;
}

Expand Down Expand Up @@ -155,6 +325,53 @@ void GetData(slsDetectorDefs::sls_receiver_header &header,
char *dataPointer, size_t imageSize, void *objectPointer) {
printDataCallBackHeader(header, callbackHeader, dataPointer, imageSize,
objectPointer);

Status* stat = static_cast<Status*>(objectPointer);

std::ostringstream oss;
oss << "{\"htype\":\"module\""
<< ", \"port\":" << callbackHeader.udpPort
<< "\"}\n";

std::string message = oss.str();
int length = message.length();

char* hdata = new char[length];

memcpy(hdata, message.c_str(), length);
std::cout << callbackHeader.udpPort << ":creating json part" << std::endl;
zmq_msg_t *hmsg = new zmq_msg_t;

zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL);

std::cout << callbackHeader.udpPort << "created header frame" << std::endl;
//zmq_msg_init_buffer (&hmsg, message, length);

char* data = new char[imageSize];

std::cout << callbackHeader.udpPort << "allocated new buffer" << std::endl;

//printf("data pointer %x, data %x\n", dataPointer, )
memcpy(data, dataPointer, imageSize);

std::cout << callbackHeader.udpPort << "copied buffer" << std::endl;
zmq_msg_t *msg = new zmq_msg_t;
zmq_msg_init_data (msg, data, imageSize, zmq_free, NULL);

std::cout << callbackHeader.udpPort << "copied data to data frame" << std::endl;

//std::tuple<zmq_msg_t *, zmq_msg_t *> msgTuple(&hmsg, &msg);

{
std::cout << callbackHeader.udpPort << "getting lock" << std::endl;
std::lock_guard<std::mutex> lock(stat->mtx);
//stat->cache[0][(long unsigned int)42] = nullptr;
std::cout << callbackHeader.udpPort << "put data in cache" << std::endl;
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(hmsg);
stat->frames[callbackHeader.udpPort][header.detHeader.frameNumber].push_back(msg);
}
std::cout << callbackHeader.udpPort << "call not correlate" << std::endl;
sem_post(&stat->available);
}

/**
Expand Down Expand Up @@ -233,6 +450,14 @@ int main(int argc, char *argv[]) {
cprintf(RED, "Could not set handler function for SIGPIPE\n");
}

Status stat{false, numReceivers};

sem_init(&stat.available, 0, 0);

void* user_data = static_cast<void *>(&stat);

std::thread combinerThread(Correlate, &stat);

/** - loop over number of receivers */
for (int i = 0; i != numReceivers; ++i) {

Expand All @@ -241,33 +466,32 @@ int main(int argc, char *argv[]) {
semaphores.push_back(semaphore);

threads.emplace_back([semaphore, i, startTCPPort, withCallback,
numReceivers]() {
numReceivers, user_data]() {
sls::Receiver receiver(startTCPPort + i);

/** - register callbacks. remember to set file write enable to 0
(using the client) if we should not write files and you will write
data using the callbacks */
if (withCallback) {

/** - Call back for start acquisition */
cprintf(BLUE, "%d: Registering Start Acquisition Callback\n",
i);
receiver.registerCallBackStartAcquisition(StartAcq, nullptr);

/** - Call back for acquisition finished */
cprintf(BLUE, "%d: Registering Acquisition Finished Callback\n",
i);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinished, nullptr);

/* - Call back for raw data */
cprintf(BLUE, "%d: Registering Data Callback \n", i);
if (withCallback == 1)
receiver.registerCallBackRawDataReady(GetData, nullptr);
else if (withCallback == 2)
receiver.registerCallBackRawDataModifyReady(GetData,
nullptr);
}

/** - Call back for start acquisition */
cprintf(BLUE, "%d: Registering Start Acquisition Callback\n",
i);
receiver.registerCallBackStartAcquisition(StartAcq, user_data);

/** - Call back for acquisition finished */
cprintf(BLUE, "%d: Registering Acquisition Finished Callback\n",
i);
receiver.registerCallBackAcquisitionFinished(
AcquisitionFinished, user_data);

/* - Call back for raw data */
cprintf(BLUE, "%d: Registering Data Callback \n", i);
if (withCallback == 1)
receiver.registerCallBackRawDataReady(GetData, user_data);
else if (withCallback == 2)
receiver.registerCallBackRawDataModifyReady(GetData,
user_data);


/** - Print Ready and Instructions how to exit */
if (i == (numReceivers - 1)) {
Expand All @@ -284,6 +508,12 @@ int main(int argc, char *argv[]) {
thread.join();
}

std::cout << "Terminate Combiner" << std::endl;
stat.terminate = true;
sem_post(&stat.available);
combinerThread.join();
sem_destroy(&stat.available);

std::cout << "Goodbye!\n";
return 0;
}

0 comments on commit e9e1cb7

Please sign in to comment.