-
Notifications
You must be signed in to change notification settings - Fork 40
/
MJPEGWriter.cpp
131 lines (126 loc) · 4.39 KB
/
MJPEGWriter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#include "MJPEGWriter.h"
#include <fstream>
void
MJPEGWriter::Listener()
{
// send http header
std::string header;
header += "HTTP/1.0 200 OK\r\n";
header += "Cache-Control: no-cache\r\n";
header += "Pragma: no-cache\r\n";
header += "Connection: close\r\n";
header += "Content-Type: multipart/x-mixed-replace; boundary=mjpegstream\r\n\r\n";
const int header_size = header.size();
char* header_data = (char*)header.data();
fd_set rread;
SOCKET maxfd;
this->open();
pthread_mutex_unlock(&mutex_writer);
while (true)
{
rread = master;
struct timeval to = { 0, timeout };
maxfd = sock + 1;
if (sock == INVALID_SOCKET){
return;
}
int sel = select(maxfd, &rread, NULL, NULL, &to);
if (sel > 0) {
for (int s = 0; s < maxfd; s++)
{
if (FD_ISSET(s, &rread) && s == sock)
{
int addrlen = sizeof(SOCKADDR);
SOCKADDR_IN address = { 0 };
SOCKET client = accept(sock, (SOCKADDR*)&address, (socklen_t*)&addrlen);
if (client == SOCKET_ERROR)
{
cerr << "error : couldn't accept connection on sock " << sock << " !" << endl;
return;
}
maxfd = (maxfd>client ? maxfd : client);
pthread_mutex_lock(&mutex_cout);
cout << "new client " << client << endl;
char headers[4096] = "\0";
int readBytes = _read(client, headers);
cout << headers;
pthread_mutex_unlock(&mutex_cout);
pthread_mutex_lock(&mutex_client);
_write(client, header_data, header_size);
clients.push_back(client);
pthread_mutex_unlock(&mutex_client);
}
}
}
usleep(1000);
}
}
void
MJPEGWriter::Writer()
{
pthread_mutex_lock(&mutex_writer);
pthread_mutex_unlock(&mutex_writer);
const int milis2wait = 16666;
while (this->isOpened())
{
pthread_mutex_lock(&mutex_client);
int num_connected_clients = clients.size();
pthread_mutex_unlock(&mutex_client);
if (!num_connected_clients) {
usleep(milis2wait);
continue;
}
pthread_t threads[NUM_CONNECTIONS];
int count = 0;
std::vector<uchar> outbuf;
std::vector<int> params;
params.push_back(CV_IMWRITE_JPEG_QUALITY);
params.push_back(quality);
pthread_mutex_lock(&mutex_writer);
imencode(".jpg", lastFrame, outbuf, params);
pthread_mutex_unlock(&mutex_writer);
int outlen = outbuf.size();
pthread_mutex_lock(&mutex_client);
std::vector<int>::iterator begin = clients.begin();
std::vector<int>::iterator end = clients.end();
pthread_mutex_unlock(&mutex_client);
std::vector<clientPayload*> payloads;
for (std::vector<int>::iterator it = begin; it != end; ++it, ++count)
{
if (count > NUM_CONNECTIONS)
break;
struct clientPayload *cp = new clientPayload({ (MJPEGWriter*)this, { outbuf.data(), outlen, *it } });
payloads.push_back(cp);
pthread_create(&threads[count], NULL, &MJPEGWriter::clientWrite_Helper, cp);
}
for (; count > 0; count--)
{
pthread_join(threads[count-1], NULL);
delete payloads.at(count-1);
}
usleep(milis2wait);
}
}
void
MJPEGWriter::ClientWrite(clientFrame & cf)
{
stringstream head;
head << "--mjpegstream\r\nContent-Type: image/jpeg\r\nContent-Length: " << cf.outlen << "\r\n\r\n";
string string_head = head.str();
pthread_mutex_lock(&mutex_client);
_write(cf.client, (char*) string_head.c_str(), string_head.size());
int n = _write(cf.client, (char*)(cf.outbuf), cf.outlen);
if (n < cf.outlen)
{
std::vector<int>::iterator it;
it = find (clients.begin(), clients.end(), cf.client);
if (it != clients.end())
{
cerr << "kill client " << cf.client << endl;
clients.erase(std::remove(clients.begin(), clients.end(), cf.client));
::shutdown(cf.client, 2);
}
}
pthread_mutex_unlock(&mutex_client);
pthread_exit(NULL);
}