-
Notifications
You must be signed in to change notification settings - Fork 0
/
app.js
301 lines (258 loc) · 10.2 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
const express = require('express');
const fs = require('fs');
const csv = require('csv-parser');
const multer = require('multer');
const AWS = require('aws-sdk');
const app = express();
const port = 8000;
const config = require('./config.json');
const aws_access_key_id = config.aws_access_key_id;
const aws_secret_access_key = config.aws_secret_access_key;
const region = config.region;
const sqs_request_url = config.sqs_request_url;
const sqs_response_url = config.sqs_response_url;
const s3_input_bucket = config.s3_input_bucket;
const s3_output_bucket = config.s3_output_bucket;
const input_path = config.input_path;
const START_SCRIPT = `#!/bin/bash
cd /home/ubuntu/cloud_computing_project/
sudo -u ubuntu node app_tier.js`;
// Configure AWS credentials and region
AWS.config.update({
accessKeyId: aws_access_key_id,
secretAccessKey: aws_secret_access_key,
region: region
});
const SQS_REQUEST_URL = sqs_request_url;
const SQS_RESPONSE_URL = sqs_response_url;
const S3_INPUT_BUCKET = s3_input_bucket;
const S3_OUTPUT_BUCKET = s3_output_bucket;
const INPUT_PATH = input_path;
const APP_TIER_AMI_ID = 'ami-03f6854f307636580';
// Thresholds for scaling actions
const SCALE_OUT_THRESHOLD = 5;
const SCALE_IN_THRESHOLD = 5;
const SCALE_CHECK_INTERVAL = 10000;
const MAX_INSTANCES = 19;
const MIN_INSTANCES = 0;
// Tracking for scale-in and scale-out
let lastActivityTime = Date.now();
let lastScaleInReqTime = Date.now();
let scaleOutCooldown = false;
// Create an S3 instance
const s3 = new AWS.S3();
// Create an SQS instance
const sqs = new AWS.SQS();
const ec2 = new AWS.EC2();
// const predictions = {};
// Load dataset.csv into predictions map
// fs.createReadStream('dataset.csv')
// .pipe(csv())
// .on('data', (row) => {
// predictions[row['Image']] = row['Results'];
// })
// .on('end', () => {
// startServer(predictions);
// })
// .on('error', (err) => {
// console.error('Error reading prediction file:', err);
// process.exit(1);
// });
let instanceCount = 0;
const ec2InstanceSet = new Set();
const startServer = () => {
console.log('server started');
const upload = multer({ dest: 'uploads/' });
async function handleMessage(receiveParams, res, filenameWithoutExtension) {
try {
while(true) {
const receiveResult = await sqs.receiveMessage(receiveParams).promise();
if (receiveResult.Messages && receiveResult.Messages.length !== 0) {
console.log('No messages found in SQS queue');
// return res.status(404).send('No classification result found');
const message = receiveResult.Messages[0];
const messageBody = message.Body;
// Log the received message body
console.log('Received message:', messageBody);
// Split the message body into parts
const parts = messageBody.split(',');
// Check if the message body has the expected format
// if (parts.length !== 2) {
// console.error('Unexpected message format:', messageBody);
// // Continue polling recursively until the correct message is found
// return handleMessage(receiveParams, res, filenameWithoutExtension);
// }
const [returnedFileName, recognitionResult] = parts;
if (returnedFileName === filenameWithoutExtension) {
await s3.putObject({
Bucket: S3_OUTPUT_BUCKET,
Key: filenameWithoutExtension,
Body: recognitionResult
}).promise();
await sqs.deleteMessage({
QueueUrl: SQS_RESPONSE_URL,
ReceiptHandle: message.ReceiptHandle
}).promise();
// Send the classification result to the client
res.send(`${filenameWithoutExtension}:${recognitionResult}`);
break;
}
// else {
// // Continue polling recursively until the correct message is found
// return handleMessage(receiveParams, res, filenameWithoutExtension);
// }
}
}
} catch (error) {
console.error('Error receiving message from SQS:', error);
res.status(500).send('Internal Server Error');
}
}
// Handle POST request for image upload
app.post('/', upload.single('inputFile'), (req, res) => {
console.log('request recieved: ', req.file.originalname);
if (!req.file) {
return res.status(400).send('No image file uploaded!');
}
const filename = req.file.originalname;
const filenameWithoutExtension = filename.split('.')[0]; // Remove the file extension
// Upload image to S3 input bucket
const uploadParams = {
Bucket: S3_INPUT_BUCKET,
Key: filenameWithoutExtension,
Body: fs.createReadStream(req.file.path)
};
s3.upload(uploadParams, async (err, data) => {
if (err) {
console.error('Error uploading image to S3:', err);
return res.status(500).send('Internal Server Error');
}
// Send message to SQS request queue
const sqsParams = {
MessageBody: filenameWithoutExtension,
QueueUrl: SQS_REQUEST_URL
};
sqs.sendMessage(sqsParams, async (err, data) => {
if (err) {
console.error('Error sending message to SQS:', err);
return res.status(500).send('Internal Server Error');
}
// Poll SQS response queue for result
const receiveParams = {
QueueUrl: SQS_RESPONSE_URL,
//MaxNumberOfMessages: 1,
WaitTimeSeconds: 20
};
if (instanceCount == 0) {
console.log('post API, no ec2Instance found');
await autoScale();
}
lastActivityTime = Date.now(); // Update last activity time
try {
// Start polling for messages
await handleMessage(receiveParams, res, filenameWithoutExtension);
} catch (error) {
console.error('Error handling message:', error);
res.status(500).send('Internal Server Error');
}
});
});
});
app.listen(port, async () => {
console.log(`Server listening on port ${port}`);
// Start autoscaling task
setInterval(autoScale, 10000);
});
}
const { v4: uuidv4 } = require('uuid'); // Importing UUID library
async function launchNewInstance() {
const params = {
ImageId: APP_TIER_AMI_ID,
InstanceType: "t2.micro",
MinCount: 1,
MaxCount: 1,
KeyName: 'my_key_pair',
SecurityGroupIds: ['sg-0c159ba62ee2666e9'],
UserData: Buffer.from(START_SCRIPT).toString('base64')
};
try {
if (instanceCount >= MAX_INSTANCES)
{
console.log('max instances reached, increase total limit')
return
}
instanceCount++;
const instanceName = `app-tier-instance-${uuidv4()}`;
// Add instance name to the params
params.TagSpecifications = [
{
ResourceType: "instance",
Tags: [
{
Key: "Name",
Value: instanceName
},
// Add other tags if needed
]
}
];
// Launch the instance with the provided params
const data = await ec2.runInstances(params).promise();
console.log("Successfully launched instance", data.Instances[0].InstanceId);
ec2InstanceSet.add(data.Instances[0].InstanceId);
// Additional setup or tagging can be done here
} catch (error) {
console.error("Failed to launch instance:", error);
instanceCount--;
}
}
async function getQueueLength(queueUrl) {
const params = {
QueueUrl: queueUrl,
AttributeNames: ['ApproximateNumberOfMessages']
};
const data = await sqs.getQueueAttributes(params).promise();
return parseInt(data.Attributes.ApproximateNumberOfMessages, 10);
}
async function terminateInstance(instanceId) {
const params = {
InstanceIds: [instanceId],
};
try {
instanceCount--;
await ec2.terminateInstances(params).promise();
console.log(`Successfully requested termination of instance ${instanceId}`);
ec2InstanceSet.delete(instanceId);
} catch (error) {
instanceCount++;
console.error("Failed to terminate instance:", error);
}
}
async function autoScale() {
try {
const queueLength = await getQueueLength(SQS_REQUEST_URL);
console.log('Queue length:', queueLength);
console.log("checking scaling");
console.log("instanceCount = ", instanceCount);
if (instanceCount == 0 && queueLength > 0)
{
console.log("No instance detected, launching right away");
await launchNewInstance();
}
// Scale Out: If pending requests exceed the threshold, launch a new instance.
else if (queueLength / instanceCount >= SCALE_OUT_THRESHOLD && instanceCount < MAX_INSTANCES) {
console.log("Scaling out due to high load...");
await launchNewInstance();
}
// Scale In: If the load decreases significantly, terminate an instance.
else if ( queueLength <= SCALE_IN_THRESHOLD*instanceCount && instanceCount > MIN_INSTANCES) {
console.log("Scaling in due to low load...");
const iterator = ec2InstanceSet.values();
const first = iterator.next().value;
await terminateInstance(first);
}
} catch (error) {
console.log("Error in autoscaling ->", error);
}
}
startServer();