-
Notifications
You must be signed in to change notification settings - Fork 1
/
54871935_54581876.cpp
329 lines (267 loc) · 7.34 KB
/
54871935_54581876.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
/*
CS3103 Assignment 3 (PFIFO)
Completed by:
Quintus Kilbourn (54871935)
Rohit Shyla Kumar (54581876)
File Name - 54871935_54581876.cpp
Description - Multithreaded implementation of a FIFO queue
*/
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <stdlib.h>
#include <math.h>
#include <time.h>
#include <semaphore.h>
#include "MyQueue.h"
#define SSLEEP 2000000
using namespace std;
//Global variables
static pthread_mutex_t qmtx = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t smtx = PTHREAD_MUTEX_INITIALIZER;
sem_t empty;
int servTok = 0;
int queueLen = 0;
int totFetch = 0;
int currentSeq = 0;
int pflowed = 0;
/*TODO
Remove global vars where possible
if totfetch still global var - remove from servincrement params
commenting
txt file
*/
//Function to serve a single token
bool servIncrement(MyQueue* queue, int &fetched, int maxC,int &totFetch) ////DONT HAVE TO PASS TOTFETCH IF GLOBAL VAR
{
//Ensure the queue is not empty
if(queue->DeQueue()==-1)
{
cout<<"DeQueue not successfull"<<endl;
return 0;
}
fetched++;
totFetch++;
queueLen--;
servTok++;
//Case - served as many tokens as required by the user input
//Action - return successful
if(servTok>=maxC)
{
return 1;
}
return 0;
}
//Function to parse thread arguments
//Commong to server and flow threads that use the same set of arguments
void parseArgs(void *args,MyQueue* &queue,int &maxC,int &flowInt)
{
//Typecast the void pointers to pointer types we require and extract accordingly
MyQueue** toQ = (MyQueue**)args;
queue = toQ[1];
int** toInt = (int**)args;
maxC = *(toInt[2]);
flowInt = *(toInt[0]);
}
//Thread function to serve tokens removed from the queue
void *serve(void *args)
{
//Parse arguments
MyQueue* queue;
int maxC, flowInt;
parseArgs(args, queue, maxC, flowInt);
static int fetched;
//int totFetch = 0;
//Run until as many tokens as the user desires are served
while(servTok<maxC)
{
int ran = rand()%20+1; //generate random number between 1 and 20 to determine number of tokens to serve
fetched = 0;
//Lock the mutex so elements will not be inserted into the queue by other threads while being served
if(pthread_mutex_lock(&qmtx))
{
cout<<"Mutex Lock Error"<<endl;
exit(-1);
}
while(1){
if(!(queue->isEmpty()))
{
if(servIncrement(queue, fetched, maxC,totFetch))
{
break;
}
}
else
{
sem_post(&empty);
//cout<<"_________Sem Posted________"<<endl;
break;
}
}
cout<<"\t\t\t\t\t"<<queueLen<<"\t\t"<<fetched<<"\t\t"<<totFetch<<endl;
//Unlock the mutex so other threads may access the queue again
if(pthread_mutex_unlock(&qmtx))
{
cout<<"Mutex Unlock Error"<<endl;
exit(-1);
}
//Sleep for a constant 2 second time
usleep(SSLEEP);
}
sem_post(&empty); ////not final solution
//void *retVal = &totFetch;
return (void*)1;
}
void flowFunc(bool flag, int maxC, MyQueue* queue){
int ran;
//// 0 for pflow
if (flag == 0)
ran = rand()%5+1;
else
ran = rand()%10+1;
int placed = 0; //track number of tokens actually placed in the queue incase of overflow
//Lock the mutex so elements will not be removed from the queue by other threads while being inserted
if(pthread_mutex_lock(&qmtx))
{
cout<<"Mutex Lock Error"<<endl;
exit(-1);
}
/////////////////////////////////
if(pthread_mutex_lock(&smtx))
{
cout<<"Mutex Lock Error"<<endl;
exit(-1);
}
/////////////////////////////////
for(int i = 0; i<ran;i++)
{
//case - queue is not full
//action - insert tokens into the queue
if(!(queue->isFull()))
{
queue->EnQueue(currentSeq);
currentSeq++;
queueLen++;
placed++;
}
//case - queue is full
//action - serve overflowed tokens
else
{
servTok++;
if(servTok>=maxC)
{
break;
}
}
}
if (flag != 0)
{
cout<<placed<<"(FLOW)\t\t"<<currentSeq-1<<"\t\t\t"<<queueLen<<endl;
}
else
{
pflowed +=placed;
cout<<placed<<"(PFLOW)\t"<<currentSeq-1<<"\t\t\t"<<queueLen<<endl;
}
//////////////////////////////////////////////
if(pthread_mutex_unlock(&smtx))
{
cout<<"Mutex Unlock Error"<<endl;
exit(-1);
}
/////////////////////////////////////////
//Unlock the mutex so other threads may access the queue again
if(pthread_mutex_unlock(&qmtx))
{
cout<<"Mutex Unlock Error"<<endl;
exit(-1);
}
}
//Thread function to generate tokens and insert them into the queue
void *flow(void *args)
{
//Parse arguments
MyQueue* queue;
int maxC, flowInt;
parseArgs(args,queue,maxC,flowInt);
//static int count = 0; //static count to track sequence number of tokens being inserted into the queue
//Run until as many tokens as the user desires are served
while(servTok<maxC)
{
flowFunc(1, maxC, queue);
//Sleep for a user defined time period
usleep(flowInt*1000000);
}
return (void *) 1;
}
void* pflow(void *args){
MyQueue* queue;
int maxC, flowInt;
parseArgs(args, queue, maxC, flowInt);
while(servTok<maxC)
{
// cout<<"_________Sem Wait Called________"<<endl;
sem_wait(&empty);
if(servTok>=maxC)
return (void*) -1;
flowFunc(0, maxC, queue);
}
return (void*)1;
}
/*
Main function
Main thread creates other threads and displays outputs
*/
int main(int argc, char* argv[])
{
//Extracting command line arguments for max count of tokens to be served and flow interval
//abs() ensures no negative values crash the program
int maxC = abs(strtol(argv[1],NULL,10));
int flowInt = abs(strtol(argv[2],NULL,10));
//INITIALISE SEMAPHORES
sem_init(&empty, 0, 0);
//Store thread ids of different threads
pthread_t flowId, servId, pflowId;
MyQueue* queue = new MyQueue();
//Prepare arguments as a void pointer to pass to thread functions
void* args[] = {&flowInt, queue, &maxC};
//Print header with appropriate spacing using horizontal tabs
cout<<"The Max Token is "<< maxC << " and the interval time for the flow is "<< flowInt<<endl;
cout<<"Flow\t\t\t\t\tMyQueue\t\tServer"<<endl;
cout<<"Token Added\tLatest Sequence Num\tCurrent Length\tToken Fetched\tTotal Token Fetched"<<endl;
//set the seed of the random number generator to the current time to produce more random results
srand(time(NULL));
//Create threads and handle errors
if(pthread_create(&flowId,NULL,flow,args))
{
cout<<"Failed to create flow thread"<<endl;
exit(-1);
}
if(pthread_create(&servId,NULL,serve,args))
{
cout<<"Failed to create server thread"<<endl;
exit(-1);
}
if(pthread_create(&pflowId,NULL,pflow,args))
{
cout<<"Failed to create server thread"<<endl;
exit(-1);
}
//void* retVal; //temporarily store return value from server thread
//Wait for the thread functions to return before continuing with the main thread's functions
pthread_join(servId,NULL);
pthread_join(flowId,NULL);
pthread_join(pflowId,NULL);
sem_destroy(&empty);
//int totFetch = *((int *)retVal); //convert value returned from server thread
////////////////////////////////////////////////
//TODO get rid of global vars!!!!!!!!!!!!///////
///////////////////////////////////////////////
//Print final results
cout << "The total number of token that have been fetched by the server is " << totFetch << endl;
cout<<"The total number of token that have been generated by the flow is " << servTok + queueLen - pflowed << endl;
cout<<"The total number of token that have been generated by the pflow is " << pflowed <<endl;
cout << "The total number of token that have been dropped by the queue is " << maxC- totFetch << endl;
return 0;
}