-
Notifications
You must be signed in to change notification settings - Fork 0
/
ImagingServer.cpp
153 lines (114 loc) · 5.35 KB
/
ImagingServer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#include "ImagingServer.hpp"
ImagingServer::ImagingServer(unsigned short port, uint8_t nodeID) :
AbstractBroadcastNode<ImageChunkPacket>(nodeID, port, "ImgSrv"),
_mutex_image_map(), _image_map(), _mutex_building_image_map(), _building_image_map(),
_thread_check_timeout_imgs() {
}
ImagingServer::~ImagingServer() {
if (_thread_check_timeout_imgs.joinable()) {
_thread_check_timeout_imgs.join();
}
}
void ImagingServer::_tr_check_timeout_imgs() {
uint32_t actualTimestamp;
loguru::set_thread_name(threadname("checkImgTO").c_str());
LOG_F(INFO, "Starting ImagingSrv _check_timeout_imgs");
std::map<Position, ImageBuilder>::const_iterator it;
while (! process_stop) {
{ // Scope spécifique sinon lock_guard va dans le sleep
std::lock_guard<std::mutex> lock_building(_mutex_building_image_map);
for (it = _building_image_map.cbegin(); it != _building_image_map.cend(); /*no increment*/ ) {
actualTimestamp = static_cast<uint32_t>(std::time(nullptr));
if ((actualTimestamp - ((it->second).get_timestamp())) > _image_reception_timeout) {
// No response
LOG_F(WARNING, "Image lost from NodeID=%d, Lost rate : %d%%", (it->second).get_nodeid(), (it->second).get_loss_percent());
_building_image_map.erase(it++);
}
else {
++it;
}
}
}
std::this_thread::sleep_for(std::chrono::seconds(_image_reception_check));
}
LOG_F(INFO, "ImagingSrv _check_completed_img - process_stop=true; exiting");
}
ImageChunkPacket ImagingServer::_produce_packet() {
ImageChunkPacket packet = AbstractBroadcastNode<ImageChunkPacket>::_produce_packet();
LOG_F(3, "Generated packet: %s", packet.repr().c_str());
return packet;
}
void ImagingServer::_process_packet(const ImageChunkPacket& packet) {
std::lock_guard<std::mutex> lock_building(_mutex_building_image_map);
if (_building_image_map.find(packet.position) == _building_image_map.end()) { // New Image
_building_image_map.emplace(packet.position, ImageBuilder(packet));
}
else {
_building_image_map[packet.position].add_chunk(packet);
if (_building_image_map[packet.position].is_complete()) {
{
std::lock_guard<std::mutex> lock(_mutex_image_map);
_image_map[packet.position] = _building_image_map[packet.position].get_image();
}
LOG_F(3, "Image moved to _image_map: %s", packet.repr().c_str());
_building_image_map.erase(packet.position); // image completed -> delete ImageBuilder
LOG_F(3, "Image deleted from _building_image_map : %s", packet.repr().c_str());
}
}
}
void ImagingServer::_send_image() {
ImageChunkPacket packet = _produce_packet();
uint32_t size_file_remaining;
uint32_t bytes_to_treat(IMG_CHUNK_SIZE);
// TODO rm -hardcoded
FILE* image_file;
char path_img[](INPUT_FILE(_nodeID));
memset(&packet.chunk_content, '\0', IMG_CHUNK_SIZE);
packet.sizeImage = get_size(path_img);
size_file_remaining = packet.sizeImage;
image_file = fopen(path_img, "rb");
if (IMG_CHUNK_SIZE > size_file_remaining) {
bytes_to_treat = size_file_remaining;
}
else {
bytes_to_treat = IMG_CHUNK_SIZE;
}
while(size_file_remaining > 0 and !feof(image_file)) {
fread(&packet.chunk_content[0], sizeof(char), bytes_to_treat, image_file);
this->broadcast(packet);
LOG_F(7, "ImageChunkPacket: %s", packet.repr().c_str());
size_file_remaining -= bytes_to_treat;
packet.offset += bytes_to_treat;
memset(&packet.chunk_content, '\0', IMG_CHUNK_SIZE);
if (IMG_CHUNK_SIZE <= size_file_remaining) {
bytes_to_treat = IMG_CHUNK_SIZE;
}
else {
bytes_to_treat = size_file_remaining;
}
std::this_thread::sleep_for(std::chrono::milliseconds(IMAGE_SEND_SLEEP_SEPARATOR));
}
fclose(image_file);
}
void ImagingServer::start() {
LOG_F(WARNING, "Starting Imaging server");
AbstractBroadcastNode<ImageChunkPacket>::start();
_thread_check_timeout_imgs = std::thread(&ImagingServer::_tr_check_timeout_imgs, this);
}
void ImagingServer::join() {
AbstractBroadcastNode<ImageChunkPacket>::join();
_thread_check_timeout_imgs.join();
LOG_F(WARNING, "ImgServer: joined all threads");
}
inline bool ImagingServer::_to_be_ignored(const ImageChunkPacket& packet) const {
bool chunk_already_received;
bool image_already_constructed;
{
std::lock_guard<std::mutex> lock_building(_mutex_building_image_map);
std::lock_guard<std::mutex> lock_constructed(_mutex_image_map);
chunk_already_received = (_building_image_map.find(packet.position) != _building_image_map.end()) && (_building_image_map.at(packet.position).is_chunk_already_received(packet));
image_already_constructed = _image_map.find(packet.position) != _image_map.end() && packet.timestamp == _image_map.at(packet.position).timestamp;
}
// ignore self NodeID || ignore packets for already received chunks || ignore packets for already build images
return AbstractBroadcastNode<ImageChunkPacket>::_to_be_ignored(packet) || chunk_already_received || image_already_constructed;
}