Skip to content

Commit

Permalink
working so far if everything goes right
Browse files Browse the repository at this point in the history
  • Loading branch information
felix-engelmann committed Sep 12, 2024
1 parent e1c780c commit e798068
Showing 1 changed file with 78 additions and 9 deletions.
87 changes: 78 additions & 9 deletions slsReceiverSoftware/src/FrameSynchronizerApp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <iostream>
#include <ostream>
#include <semaphore.h>
#include <sys/socket.h>
#include <sys/wait.h> //wait
#include <thread>
#include <mutex>
Expand Down Expand Up @@ -66,6 +67,7 @@ void zmq_free (void *data, void *hint)


struct Status{
bool starting = true;
bool terminate = false;
unsigned long num_receivers;

Expand All @@ -75,6 +77,7 @@ struct Status{

std::vector<zmq_msg_t*> headers;
std::map<unsigned int, std::map<long unsigned int, std::vector<zmq_msg_t*> > > frames;
std::vector<zmq_msg_t*> ends;

};

Expand Down Expand Up @@ -161,33 +164,77 @@ std::set<long unsigned int> find_keys(const std::map<unsigned int, std::map<long
return valid_keys;
}

int zmq_send_multipart(void* socket, const std::vector<zmq_msg_t*>& messages) {
size_t num_messages = messages.size();

// Iterate over each message in the vector
for (size_t i = 0; i < num_messages; ++i) {
zmq_msg_t* msg = messages[i];

// Determine flags: ZMQ_SNDMORE for all messages except the last
int flags = (i == num_messages - 1) ? 0 : ZMQ_SNDMORE;

// Send the message part
if (zmq_msg_send(msg, socket, flags) == -1) {
std::cerr << "Error sending message: " << zmq_strerror(zmq_errno()) << std::endl;
return -1; // Return -1 on error
}
}

return 0; // Return 0 on success
}

void Correlate(Status *stat) {
bool starting = true;
std::vector<unsigned int> ports;
void *context = zmq_ctx_new ();

void *socket = zmq_socket (context, ZMQ_PUSH);
int rc = zmq_bind (socket, "tcp://*:5555");
if (rc != 0){
std::cout << "failed to bind";
}

while (!stat->terminate) {
sem_wait(&(stat->available));
std::cout << "Correlate cache" << std::endl;
sem_wait(&(stat->available));
{
std::lock_guard<std::mutex> lock(stat->mtx);
if (starting) {
if (stat->starting) {
if (stat->headers.size() == stat->num_receivers) {
std::cout << "got all start messages" << std::endl;
starting = false;
ports.clear();

stat->starting = false;
zmq_send_multipart(socket, stat->headers);
stat->headers.clear();
}
}
else {
std::cout << "sending data, common keys" << std::endl;
//print_frames(stat->frames);
auto common_keys = find_keys(stat->frames);
for (const auto& key : common_keys) {
std::vector<zmq_msg_t *> parts;
for (const auto& [port, trigger_map] : stat->frames) {
auto it = trigger_map.find(key);
if (it != trigger_map.end()) {
parts.insert(parts.end(), stat->frames[port][key].begin(), stat->frames[port][key].end());
std::cout << " Key " << key << " found in map " << port << std::endl;
stat->frames[port].erase(key);
}
}
std::cout << key << " ";
zmq_send_multipart(socket, parts);
}
std::cout << "\n\n";
}
if (stat->ends.size() == stat->num_receivers) {
std::cout << "all ends received, flushing" << std::endl;
// clean up all remaining frames
zmq_send_multipart(socket, stat->ends);
stat->ends.clear();
}
}
}
zmq_close(socket);
zmq_ctx_destroy(context);
}

/**
Expand Down Expand Up @@ -215,7 +262,7 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,

std::ostringstream oss;
oss << "{\"htype\":\"header\""
<< ", \"udpPort\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort)
<< ", \"filePath\":" << callbackHeader.filePath
<< "\"}\n";

Expand All @@ -230,6 +277,7 @@ int StartAcq(const slsDetectorDefs::startCallbackHeader callbackHeader,
{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->headers.push_back(hmsg);
stat->starting = true;
for(int port: callbackHeader.udpPort) {
std::cout << "clear cache for stream" << port << std::endl;
for (auto& pair : stat->frames[port]) {
Expand Down Expand Up @@ -259,6 +307,27 @@ void AcquisitionFinished(
<< "\n\tLast Frame Index : "
<< sls::ToString(callbackHeader.lastFrameIndex)
<< "\n\t]";

Status* stat = static_cast<Status*>(objectPointer);

std::ostringstream oss;
oss << "{\"htype\":\"series_end\""
<< ", \"udpPorts\":" << sls::ToString(callbackHeader.udpPort)
<< "\"}\n";

std::string message = oss.str();
int length = message.length();
char* hdata = new char[length];

memcpy(hdata, message.c_str(), length);
zmq_msg_t *hmsg = new zmq_msg_t;
zmq_msg_init_data (hmsg, hdata, length, zmq_free, NULL);

{
std::lock_guard<std::mutex> lock(stat->mtx);
stat->ends.push_back(hmsg);
}
sem_post(&stat->available);
}

/**
Expand Down Expand Up @@ -428,7 +497,7 @@ int main(int argc, char *argv[]) {
cprintf(RED, "Could not set handler function for SIGPIPE\n");
}

Status stat{false, numReceivers};
Status stat{true, false, numReceivers};

sem_init(&stat.available, 0, 0);

Expand Down

0 comments on commit e798068

Please sign in to comment.