-
Notifications
You must be signed in to change notification settings - Fork 182
/
Messager.cpp
435 lines (365 loc) · 12.7 KB
/
Messager.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
/***************************************************/
/*! \class Messager
\brief STK input control message parser.
This class reads and parses control messages from a variety of
sources, such as a scorefile, MIDI port, socket connection, or
stdin. MIDI messages are retrieved using the RtMidi class. All
other input sources (scorefile, socket, or stdin) are assumed to
provide SKINI formatted messages. This class can be compiled with
generic, non-realtime support, in which case only scorefile
reading is possible.
The various \e realtime message acquisition mechanisms (from MIDI,
socket, or stdin) take place asynchronously, filling the message
queue. A call to popMessage() will pop the next available control
message from the queue and return it via the referenced Message
structure. When a \e non-realtime scorefile is set, it is not
possible to start reading realtime input messages (from MIDI,
socket, or stdin). Likewise, it is not possible to read from a
scorefile when a realtime input mechanism is running.
When MIDI input is started, input is also automatically read from
stdin. This allows for program termination via the terminal
window. An __SK_Exit_ message is pushed onto the stack whenever
an "exit" or "Exit" message is received from stdin or when all
socket connections close and no stdin thread is running.
This class is primarily for use in STK example programs but it is
generic enough to work in many other contexts.
by Perry R. Cook and Gary P. Scavone, 1995--2023.
*/
/***************************************************/
#include "Messager.h"
#include <iostream>
#include <algorithm>
#include "SKINImsg.h"
namespace stk {
#if defined(__STK_REALTIME__)
extern "C" THREAD_RETURN THREAD_TYPE stdinHandler(void * ptr);
extern "C" THREAD_RETURN THREAD_TYPE socketHandler(void * ptr);
#endif // __STK_REALTIME__
typedef int MessagerSourceType;
MessagerSourceType STK_FILE = 0x1;
MessagerSourceType STK_MIDI = 0x2;
MessagerSourceType STK_STDIN = 0x4;
MessagerSourceType STK_SOCKET = 0x8;
Messager :: Messager()
{
data_.sources = 0;
data_.queueLimit = DEFAULT_QUEUE_LIMIT;
#if defined(__STK_REALTIME__)
data_.socket = 0;
data_.midi = 0;
#endif
}
Messager :: ~Messager()
{
// Clear the queue in case any thread is waiting on its limit.
#if defined(__STK_REALTIME__)
data_.mutex.lock();
#endif
while ( data_.queue.size() ) data_.queue.pop();
data_.sources = 0;
#if defined(__STK_REALTIME__)
data_.mutex.unlock();
if ( data_.socket ) {
socketThread_.wait();
delete data_.socket;
}
if ( data_.midi ) delete data_.midi;
#endif
}
bool Messager :: setScoreFile( const char* filename )
{
if ( data_.sources ) {
if ( data_.sources == STK_FILE ) {
oStream_ << "Messager::setScoreFile: already reading a scorefile!";
handleError( StkError::WARNING );
}
else {
oStream_ << "Messager::setScoreFile: already reading realtime control input ... cannot do scorefile input too!";
handleError( StkError::WARNING );
}
return false;
}
if ( !data_.skini.setFile( filename ) ) return false;
data_.sources = STK_FILE;
return true;
}
void Messager :: popMessage( Skini::Message& message )
{
if ( data_.sources == STK_FILE ) { // scorefile input
if ( !data_.skini.nextMessage( message ) )
message.type = __SK_Exit_;
return;
}
if ( data_.queue.size() == 0 ) {
// An empty (or invalid) message is indicated by a type = 0.
message.type = 0;
return;
}
// Copy queued message to the message pointer structure and then "pop" it.
#if defined(__STK_REALTIME__)
data_.mutex.lock();
#endif
message = data_.queue.front();
data_.queue.pop();
#if defined(__STK_REALTIME__)
data_.mutex.unlock();
#endif
}
void Messager :: pushMessage( Skini::Message& message )
{
#if defined(__STK_REALTIME__)
data_.mutex.lock();
#endif
data_.queue.push( message );
#if defined(__STK_REALTIME__)
data_.mutex.unlock();
#endif
}
#if defined(__STK_REALTIME__)
bool Messager :: startStdInput()
{
if ( data_.sources == STK_FILE ) {
oStream_ << "Messager::startStdInput: already reading a scorefile ... cannot do realtime control input too!";
handleError( StkError::WARNING );
return false;
}
if ( data_.sources & STK_STDIN ) {
oStream_ << "Messager::startStdInput: stdin input thread already started.";
handleError( StkError::WARNING );
return false;
}
// Start the stdin input thread.
if ( !stdinThread_.start( (THREAD_FUNCTION)&stdinHandler, &data_ ) ) {
oStream_ << "Messager::startStdInput: unable to start stdin input thread!";
handleError( StkError::WARNING );
return false;
}
data_.sources |= STK_STDIN;
return true;
}
THREAD_RETURN THREAD_TYPE stdinHandler(void *ptr)
{
Messager::MessagerData *data = (Messager::MessagerData *) ptr;
Skini::Message message;
std::string line;
while ( !std::getline( std::cin, line).eof() ) {
if ( line.empty() ) continue;
if ( line.compare(0, 4, "Exit") == 0 || line.compare(0, 4, "exit") == 0 )
break;
data->mutex.lock();
if ( data->skini.parseString( line, message ) )
data->queue.push( message );
data->mutex.unlock();
while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
}
// We assume here that if someone types an "exit" message in the
// terminal window, all processing should stop.
message.type = __SK_Exit_;
data->queue.push( message );
data->sources &= ~STK_STDIN;
return NULL;
}
void midiHandler( double timeStamp, std::vector<unsigned char> *bytes, void *ptr )
{
if ( bytes->size() < 2 ) return;
// Parse the MIDI bytes ... only keep MIDI channel messages.
if ( bytes->at(0) > 239 ) return;
Messager::MessagerData *data = (Messager::MessagerData *) ptr;
Skini::Message message;
message.type = bytes->at(0) & 0xF0;
message.channel = bytes->at(0) & 0x0F;
message.time = 0.0; // realtime messages should have delta time = 0.0
message.intValues[0] = bytes->at(1);
message.floatValues[0] = (StkFloat) message.intValues[0];
if ( ( message.type != 0xC0 ) && ( message.type != 0xD0 ) ) {
if ( bytes->size() < 3 ) return;
message.intValues[1] = bytes->at(2);
if ( message.type == 0xE0 ) { // combine pithbend into single "14-bit" value
message.intValues[0] += message.intValues[1] <<= 7;
message.floatValues[0] = (StkFloat) message.intValues[0];
message.intValues[1] = 0;
}
else
message.floatValues[1] = (StkFloat) message.intValues[1];
}
while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
data->mutex.lock();
data->queue.push( message );
data->mutex.unlock();
}
bool Messager :: startMidiInput( int port )
{
if ( data_.sources == STK_FILE ) {
oStream_ << "Messager::startMidiInput: already reading a scorefile ... cannot do realtime control input too!";
handleError( StkError::WARNING );
return false;
}
if ( data_.sources & STK_MIDI ) {
oStream_ << "Messager::startMidiInput: MIDI input already started.";
handleError( StkError::WARNING );
return false;
}
// First start the stdin input thread if it isn't already running
// (to allow the user to exit).
if ( !( data_.sources & STK_STDIN ) ) {
if ( this->startStdInput() == false ) {
oStream_ << "Messager::startMidiInput: unable to start input from stdin.";
handleError( StkError::WARNING );
return false;
}
}
try {
data_.midi = new RtMidiIn();
data_.midi->setCallback( &midiHandler, (void *) &data_ );
if ( port == -1 ) data_.midi->openVirtualPort();
else data_.midi->openPort( (unsigned int)port );
}
catch ( RtMidiError &error ) {
oStream_ << "Messager::startMidiInput: error creating RtMidiIn instance (" << error.getMessage() << ").";
handleError( StkError::WARNING );
return false;
}
data_.sources |= STK_MIDI;
return true;
}
bool Messager :: startSocketInput( int port )
{
if ( data_.sources == STK_FILE ) {
oStream_ << "Messager::startSocketInput: already reading a scorefile ... cannot do realtime control input too!";
handleError( StkError::WARNING );
return false;
}
if ( data_.sources & STK_SOCKET ) {
oStream_ << "Messager::startSocketInput: socket input thread already started.";
handleError( StkError::WARNING );
return false;
}
// Create the socket server.
try {
data_.socket = new TcpServer( port );
}
catch ( StkError& ) {
return false;
}
oStream_ << "Socket server listening for connection(s) on port " << port << "...";
handleError( StkError::STATUS );
// Initialize socket descriptor information.
FD_ZERO(&data_.mask);
int fd = data_.socket->id();
FD_SET( fd, &data_.mask );
data_.fd.push_back( fd );
// Start the socket thread.
if ( !socketThread_.start( (THREAD_FUNCTION)&socketHandler, &data_ ) ) {
oStream_ << "Messager::startSocketInput: unable to start socket input thread!";
handleError( StkError::WARNING );
return false;
}
data_.sources |= STK_SOCKET;
return true;
}
#if (defined(__OS_IRIX__) || defined(__OS_LINUX__) || defined(__OS_MACOSX__))
#include <sys/time.h>
#include <errno.h>
#endif
THREAD_RETURN THREAD_TYPE socketHandler(void *ptr)
{
Messager::MessagerData *data = (Messager::MessagerData *) ptr;
Skini::Message message;
std::vector<int>& fd = data->fd;
struct timeval timeout;
fd_set rmask;
int newfd;
unsigned int i;
const int bufferSize = 1024;
char buffer[bufferSize];
int index = 0, bytesRead = 0;
std::string line;
std::vector<int> fdclose;
while ( data->sources & STK_SOCKET ) {
// Use select function to periodically poll socket desriptors.
rmask = data->mask;
timeout.tv_sec = 0; timeout.tv_usec = 50000; // 50 milliseconds
if ( select( fd.back()+1, &rmask, (fd_set *)0, (fd_set *)0, &timeout ) <= 0 ) continue;
// A file descriptor is set. Check if there's a new socket connection available.
if ( FD_ISSET( data->socket->id(), &rmask ) ) {
// Accept and service new connection.
newfd = data->socket->accept();
if ( newfd >= 0 ) {
std::cout << "New socket connection made.\n" << std::endl;
// Set the socket to non-blocking mode.
Socket::setBlocking( newfd, false );
// Save the descriptor and update the masks.
fd.push_back( newfd );
std::sort( fd.begin(), data->fd.end() );
FD_SET( newfd, &data->mask );
FD_CLR( data->socket->id(), &rmask );
}
else
std::cerr << "Messager: Couldn't accept connection request!\n";
}
// Check the other descriptors.
for ( i=0; i<fd.size(); i++ ) {
if ( !FD_ISSET( fd[i], &rmask ) ) continue;
// This connection has data. Read and parse it.
bytesRead = 0;
index = 0;
#if ( defined(__OS_IRIX__) || defined(__OS_LINUX__) || defined(__OS_MACOSX__) )
errno = 0;
while (bytesRead != -1 && errno != EAGAIN) {
#elif defined(__OS_WINDOWS__)
while (bytesRead != SOCKET_ERROR && WSAGetLastError() != WSAEWOULDBLOCK) {
#endif
while ( index < bytesRead ) {
line += buffer[index];
if ( buffer[index++] == '\n' ) {
data->mutex.lock();
if ( line.compare(0, 4, "Exit") == 0 || line.compare(0, 4, "exit") == 0 ) {
// Ignore this line and assume the connection will be
// closed on a subsequent read call.
;
}
else if ( data->skini.parseString( line, message ) )
data->queue.push( message );
data->mutex.unlock();
line.erase();
}
}
index = 0;
bytesRead = Socket::readBuffer(fd[i], buffer, bufferSize, 0);
if (bytesRead == 0) {
// This socket connection closed.
FD_CLR( fd[i], &data->mask );
Socket::close( fd[i] );
fdclose.push_back( fd[i] );
}
}
}
// Now remove descriptors for closed connections.
for ( i=0; i<fdclose.size(); i++ ) {
for ( unsigned int j=0; j<fd.size(); j++ ) {
if ( fd[j] == fdclose[i] ) {
fd.erase( fd.begin() + j );
break;
}
}
// Check to see whether all connections are closed. Note that
// the server descriptor will always remain.
if ( fd.size() == 1 ) {
data->sources &= ~STK_SOCKET;
if ( data->sources & STK_MIDI )
std::cout << "MIDI input still running ... type 'exit<cr>' to quit.\n" << std::endl;
else if ( !(data->sources & STK_STDIN) ) {
// No stdin thread running, so quit now.
message.type = __SK_Exit_;
data->queue.push( message );
}
}
fdclose.clear();
}
// Wait until we're below the queue limit.
while ( data->queue.size() >= data->queueLimit ) Stk::sleep( 50 );
}
return NULL;
}
#endif
} // stk namespace