Skip to content

Commit

Permalink
add properties for pedestal buffer length and parallel threads amount
Browse files Browse the repository at this point in the history
  • Loading branch information
lrlunin committed Nov 8, 2024
1 parent 78bb7dc commit d20ea9d
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 21 deletions.
3 changes: 1 addition & 2 deletions tango-moenchzmq/src/backend/CPUComputationBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ using namespace std;

CPUComputationBackend::CPUComputationBackend(FileWriter *fileWriter,
float PEDESTAL_BUFFER_LENGTH,
unsigned int THREAD_AMOUNT)
unsigned long long THREAD_AMOUNT)
: frame_ptr_queue(20000), fileWriter(fileWriter),
PEDESTAL_BUFFER_LENGTH(PEDESTAL_BUFFER_LENGTH),
THREAD_AMOUNT(THREAD_AMOUNT) {
Expand Down Expand Up @@ -217,7 +217,6 @@ void CPUComputationBackend::processFrame(FullFrame *ff_ptr) {
}
processed_frames_amount++;
memory_pool::free(ff_ptr);
std::cout << processed_frames_amount << std::endl;
}

OrderedFrame<char, consts::LENGTH> CPUComputationBackend::classifyFrame(
Expand Down
2 changes: 1 addition & 1 deletion tango-moenchzmq/src/backend/CPUComputationBackend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class CPUComputationBackend {
std::atomic<long> processed_frames_amount{ 0 };

CPUComputationBackend(FileWriter *fileWriter, float PEDESTAL_BUFFER_LENGTH,
unsigned int THREAD_AMOUNT);
unsigned long long THREAD_AMOUNT);
CPUComputationBackend(FileWriter *fileWriter);
~CPUComputationBackend();
void initThreads();
Expand Down
5 changes: 3 additions & 2 deletions tango-moenchzmq/src/backend/ZMQListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

using namespace std;

ZMQListener::ZMQListener(std::string socket_addr, std::string socket_port) {
ZMQListener::ZMQListener(std::string socket_addr,
unsigned long long socket_port) {
if (!socket_addr.starts_with("tcp://")) {
socket_addr = "tcp://" + socket_addr;
}
std::string full_address = socket_addr + ":" + socket_port;
std::string full_address = socket_addr + ":" + std::to_string(socket_port);
context = zmq::context_t(1);
socket = zmq::socket_t(context, ZMQ_SUB);
socket.connect(full_address.c_str());
Expand Down
2 changes: 1 addition & 1 deletion tango-moenchzmq/src/backend/ZMQListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ZMQListener {
public:
CPUComputationBackend *comp_backend_ptr;
std::atomic<long> received_frames_amount;
ZMQListener(std::string socket_addr, std::string socket_port);
ZMQListener(std::string socket_addr, unsigned long long socket_port);
void listen_socket();
void start_receive();
void stop_receive();
Expand Down
42 changes: 30 additions & 12 deletions tango-moenchzmq/src/tangods/MoenchZMQ.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ void MoenchZMQ::init_device() {

file_writer_ptr = new HDFWriter(SAVE_ROOT_PATH);
zmq_listener_ptr = new ZMQListener(ZMQ_RX_IP, ZMQ_RX_PORT);
zmq_listener_ptr->comp_backend_ptr
= new CPUComputationBackend(file_writer_ptr);
zmq_listener_ptr->comp_backend_ptr = new CPUComputationBackend(
file_writer_ptr, PEDESTAL_BUFFER_LENGTH, THREAD_AMOUNT);

attr_file_index_read = new Tango::DevULong[1];
attr_file_name_read = new Tango::DevString[1];
Expand Down Expand Up @@ -136,24 +136,42 @@ void MoenchZMQ::init_device() {
//--------------------------------------------------------
void MoenchZMQ::get_device_property() {
// Initialize property data members

ZMQ_RX_IP = "127.0.0.1";
ZMQ_RX_PORT = 50003;
SAVE_ROOT_PATH = "/home/data";
THREAD_AMOUNT = 2;
PEDESTAL_BUFFER_LENGTH = 5000;
mandatoryNotDefined = false;
Tango::DbData dev_prop{ Tango::DbDatum("ZMQ_RX_IP"),
Tango::DbDatum("ZMQ_RX_PORT"),
Tango::DbDatum("SAVE_ROOT_PATH") };
Tango::DbDatum("SAVE_ROOT_PATH"),
Tango::DbDatum("THREAD_AMOUNT"),
Tango::DbDatum("PEDESTAL_BUFFER_LENGTH") };
get_db_device()->get_property(dev_prop);
// if any of the properties is empty, mark the device as not initialized
for (auto &prop : dev_prop) {
if (prop.is_empty()) {
ERROR_STREAM << "Property " << prop.name << " not set" << std::endl;
mandatoryNotDefined = true;
if (!prop.is_empty()) {
if (prop.name == "ZMQ_RX_IP") {
prop >> ZMQ_RX_IP;
} else if (prop.name == "ZMQ_RX_PORT") {
prop >> ZMQ_RX_PORT;
} else if (prop.name == "SAVE_ROOT_PATH") {
prop >> SAVE_ROOT_PATH;
} else if (prop.name == "THREAD_AMOUNT") {
prop >> THREAD_AMOUNT;
} else if (prop.name == "PEDESTAL_BUFFER_LENGTH") {
prop >> PEDESTAL_BUFFER_LENGTH;
} else
DEBUG_STREAM << "Not defined, use default value for " << prop.name
<< std::endl;
}
}
dev_prop[0] >> ZMQ_RX_IP;
dev_prop[1] >> ZMQ_RX_PORT;
dev_prop[2] >> SAVE_ROOT_PATH;

// Check device property data members init
DEBUG_STREAM << "ZMQ_RX_IP: " << ZMQ_RX_IP << std::endl;
DEBUG_STREAM << "ZMQ_RX_PORT: " << ZMQ_RX_PORT << std::endl;
DEBUG_STREAM << "SAVE_ROOT_PATH: " << SAVE_ROOT_PATH << std::endl;
DEBUG_STREAM << "THREAD_AMOUNT: " << THREAD_AMOUNT << std::endl;
DEBUG_STREAM << "PEDESTAL_BUFFER_LENGTH: " << PEDESTAL_BUFFER_LENGTH
<< std::endl;
}

void MoenchZMQ::load_images_previews() {
Expand Down
6 changes: 5 additions & 1 deletion tango-moenchzmq/src/tangods/MoenchZMQ.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ class MoenchZMQ : public TANGO_BASE_CLASS {
// ZMQ_IP:
std::string ZMQ_RX_IP;
// ZMQ_PORT:
std::string ZMQ_RX_PORT;
Tango::DevULong64 ZMQ_RX_PORT;
// SAVE_ROOT_PATH
std::string SAVE_ROOT_PATH;
// THREAD_AMOUNT
Tango::DevULong64 THREAD_AMOUNT;
// PEDESTAL_BUFFER_LENGTH
Tango::DevFloat PEDESTAL_BUFFER_LENGTH;
bool mandatoryNotDefined;

// Attribute data members
Expand Down
26 changes: 24 additions & 2 deletions tango-moenchzmq/src/tangods/MoenchZMQClass.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ void MoenchZMQClass::set_default_property() {
add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "ZMQ_RX_PORT";
prop_desc = "Port of the ZMQ upstream\nNeed to match with the config";
prop_def = "Default: 5XXXX";
prop_def = "50003";
vect_data.clear();
if (prop_def.length() > 0) {
Tango::DbDatum data(prop_name);
Expand All @@ -309,7 +309,29 @@ void MoenchZMQClass::set_default_property() {
prop_name = "SAVE_ROOT_PATH";
prop_desc = "Root path for saving the images\nA folder with the date will "
"be created";
prop_def = "Default: /home/data";
prop_def = "/home/data";
vect_data.clear();
if (prop_def.length() > 0) {
Tango::DbDatum data(prop_name);
data << vect_data;
dev_def_prop.push_back(data);
add_wiz_dev_prop(prop_name, prop_desc, prop_def);
} else
add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "THREAD_AMOUNT";
prop_desc = "Number of threads for parallel processing";
prop_def = "10";
vect_data.clear();
if (prop_def.length() > 0) {
Tango::DbDatum data(prop_name);
data << vect_data;
dev_def_prop.push_back(data);
add_wiz_dev_prop(prop_name, prop_desc, prop_def);
} else
add_wiz_dev_prop(prop_name, prop_desc);
prop_name = "PEDESTAL_BUFFER_LENGTH";
prop_desc = "N last frames to calculate pedestal";
prop_def = "5000";
vect_data.clear();
if (prop_def.length() > 0) {
Tango::DbDatum data(prop_name);
Expand Down

0 comments on commit d20ea9d

Please sign in to comment.