diff --git a/CHANGELOG.md b/CHANGELOG.md index 145fab5..382727e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +### 1.1.2 + +make things better + +### 1.0.1 + +Add socket.io support + ### 1.0.0 Initial release of `thor`. diff --git a/README.md b/README.md index 6b97fe5..d692608 100644 --- a/README.md +++ b/README.md @@ -1,151 +1,231 @@ -# Thor - -Thor is WebSocket benchmarking/load generator. There are a lot of benchmarking -tools for HTTP servers. You've got ab, siege, wrk and more. But all these tools -only work with plain ol HTTP and have no support for WebSockets - even if they did -they wouldn't be suitable, as they would be testing short running HTTP requests -instead of long running HTTP requests with a lot of messaging traffic. Thor -fixes all of this. - -### Dependencies - -Thor requires Node.js to be installed on your system. If you don't have Node.js -installed you can download it from http://nodejs.org or build it from the github -source repository: http://github.com/joyent/node. - -Once you have Node.js installed, you can use the bundled package manager `npm` to -install this module: - -``` -npm install -g thor -``` - -The `-g` command flag tells `npm` to install the module globally on your system. - -### Usage - -``` -thor [options] -``` - -Thor can hit multiple URL's at once; this is useful if you are testing your -reverse proxies, load balancers or just simply multiple applications. The url -that you supply to `thor` should be written in a WebSocket compatible format -using the `ws` or `wss` protocols: - -``` -thor --amount 5000 ws://localhost:8080 wss://localhost:8081 -``` - -The snippet above will open up `5000` connections against the regular -`ws://localhost:8080` and also `5000` connections against the *secured* -`wss://localhost:8081` server, so a total of `10000` connections will be made. - -One thing to keep in mind is you probably need to bump the amount of file -descriptors on your local machine if you start testing WebSockets. Set the -`ulimit -n` on machine as high as possible. If you do not know how to do this, -Google it. - -#### Options - -``` - Usage: thor [options] ws://localhost - - Options: - - -h, --help output usage information - -A, --amount the amount of persistent connections to generate - -C, --concurrent how many concurrent-connections per second - -M, --messages messages to be send per connection - -P, --protocol WebSocket protocol version - -B, --buffer size of the messages that are send - -W, --workers workers to be spawned - -G, --generator custom message generators - -M, --masked send the messaged with a mask - -b, --binary send binary messages instead of utf-8 - -V, --version output the version number -``` - -Some small notes about the options: - -- `--protocol` is the protocol version number. If you want to use the *HyBi drafts - 07-12* use `8` as argument or if you want to use the *HyBi drafts 13-17* - drafts which are the default version use `13`. -- `--buffer` should be size of the message in bytes. -- `--workers` as Node.js is single threaded this sets the amount of sub - processes to handle all the heavy lifting. - -### Custom messages - -Some WebSocket servers have their own custom messaging protocol. In order to -work with those servers we introduced a concept called `generators` a generator -is a small JavaScript file that can output `utf8` and `binary` messages. It uses -a really simple generator by default. - -Checkout https://github.com/observing/thor/blob/master/generator.js for an -example of a generator. - -``` -thor --amount 1000 --generator ws://localhost:8080 -``` - -### Example - -``` -thor --amount 1000 --messages 100 ws://localhost:8080 -``` - -This will hit the WebSocket server that runs on localhost:8080 with 1000 -connections and sends 100 messages over each established connection. Once `thor` -is done with smashing your connections it will generate a detailed report: - -``` -Thor: version: 1.0.0 - -God of Thunder, son of Odin and smasher of WebSockets! - -Thou shall: -- Spawn 4 workers. -- Create all the concurrent/parallel connections. -- Smash 1000 connections with the mighty Mjölnir. - -The answers you seek shall be yours, once I claim what is mine. - -Connecting to ws://localhost:8080 - - Opened 100 connections - Opened 200 connections - Opened 300 connections - Opened 400 connections - Opened 500 connections - Opened 600 connections - Opened 700 connections - Opened 800 connections - Opened 900 connections - Opened 1000 connections - - -Online 15000 milliseconds -Time taken 31775 milliseconds -Connected 1000 -Disconnected 0 -Failed 0 -Total transferred 120.46MB -Total received 120.43MB - -Durations (ms): - - min mean stddev median max -Handshaking 217 5036 4094 3902 14451 -Latency 0 215 104 205 701 - -Percentile (ms): - - 50% 66% 75% 80% 90% 95% 98% 98% 100% -Handshaking 3902 6425 8273 9141 11409 12904 13382 13945 14451 -Latency 205 246 266 288 371 413 437 443 701 -``` - -### License - -MIT +# Thor + +Thor is WebSocket benchmarking/load generator. There are a lot of benchmarking +tools for HTTP servers. You've got ab, siege, wrk and more. But all these tools +only work with plain ol HTTP and have no support for WebSockets - even if they did +they wouldn't be suitable, as they would be testing short running HTTP requests +instead of long running HTTP requests with a lot of messaging traffic. Thor +fixes all of this. + +### Dependencies + +Thor requires Node.js to be installed on your system. If you don't have Node.js +installed you can download it from http://nodejs.org or build it from the github +source repository: http://github.com/joyent/node. + +Once you have Node.js installed, you can use the bundled package manager `npm` to +install this module: + +``` +npm install -g git://github.com/iorichina/thor.git +``` + +The `-g` command flag tells `npm` to install the module globally on your system. + +### Usage + +``` +thor [options] +socketio [options] +``` + +Thor can hit multiple URL's at once; this is useful if you are testing your +reverse proxies, load balancers or just simply multiple applications. The url +that you supply to `thor` should be written in a WebSocket compatible format +using the `ws` or `wss` protocols: + +``` +thor --amount 5000 ws://localhost:8080 wss://localhost:8081 +``` +or use http/https if and only if ur using socket.io(nodejs/java/etc) in server +``` +socketio --amount 5000 http://localhost:8080/ https://localhost:8081/ +``` + +The snippet above will open up `5000` connections against the regular +`ws://localhost:8080` and also `5000` connections against the *secured* +`wss://localhost:8081` server, so a total of `10000` connections will be made. + +One thing to keep in mind is you probably need to bump the amount of file +descriptors on your local machine if you start testing WebSockets. Set the +`ulimit -n` on machine as high as possible. If you do not know how to do this, +Google it. + +And the other thing u have to know is that, one ip can only create max to 60 thousands connections, +which is limited by TCP/IP protocal. U can special the [@@one_of_the_machine_vip] after the url. + +#### thor Options + +``` + Usage: thor [options] urls + + urls like + ws://localhost:8080/?params + ws://localhost:8080/?params ws://localhost:8080/socket.io/?transport=websocket + ws://localhost:8080/?params@@192.168.102.33 ws://localhost:8080/socket.io/?transport=websocket@@192.168.102.53 + + Options: + + -h, --help output usage information + -A, --amount the amount of persistent connections to generate, default 10000 + -C, --concurrent [connections] [deprecated]how many concurrent-connections per second, default 0 + -M, --messages [messages] number of messages to be send per connection, default 0 + -P, --protocol [protocol] WebSocket protocol version, default 13 + -B, --buffer [size] size of the messages that are send, default 1024 + -W, --workers [cpus] workers to be spawned, default cpus.length + -G, --generator [file] custom message generators + -M, --masked send the messaged with a mask + -b, --binary send binary messages instead of utf-8 + --SE, --serverEngine [engine] "socket.io"/"engine.io"(nodejs), "netty-socketio"(java) must be specified if ur using these engine in ur server + --PI, --pingInterval [seconds] seconds for doing ping to keep-alive, default 50 + --SI, --statInterval [seconds] show stat info interval, default 60 + --RT, --runtime [seconds] timeout to close socket(seconds), default to unlimited, u must stop by ctrl+c + -V, --version output the version number +``` + +Some small notes about the options: + +- `--protocol` is the protocol version number. If you want to use the *HyBi drafts + 07-12* use `8` as argument or if you want to use the *HyBi drafts 13-17* + drafts which are the default version use `13`. +- `--buffer` should be size of the message in bytes. +- `--workers` as Node.js is single threaded this sets the amount of sub + processes to handle all the heavy lifting. + +#### socketio Options + +``` + Usage: socketio [options] urls + + urls like + http://localhost:8080/ + http://localhost:8080/?params + http://localhost:8080/?params@@192.168.102.53 + + Options: + + -h, --help output usage information + -A, --amount the amount of persistent connections to generate, default 10000 + -C, --concurrent [connections] [deprecated]how many concurrent-connections per second, default 0 + -M, --messages [messages] number of messages to be send per connection, default 0 + -P, --protocol [protocol] WebSocket protocol version, default 13 + -B, --buffer [size] size of the messages that are send, default 1024 + -W, --workers [cpus] workers to be spawned, default cpus.length + -G, --generator [file] custom message generators + -M, --masked send the messaged with a mask + -b, --binary send binary messages instead of utf-8 + --TP, --transport [transport] "polling"/"websocket" default websocket + --PI, --pingInterval [seconds] seconds for doing ping to keep-alive, default 50 + --SI, --statInterval [seconds] show stat info interval, default 60 + --RT, --runtime [seconds] timeout to close socket(seconds), default to unlimited, u must stop by ctrl+c + -V, --version output the version number +``` + + +### Custom messages + +Some WebSocket servers have their own custom messaging protocol. In order to +work with those servers we introduced a concept called `generators` a generator +is a small JavaScript file that can output `utf8` and `binary` messages. It uses +a really simple generator by default. + +Checkout https://github.com/observing/thor/blob/master/generator.js for an +example of a generator. + +``` +thor --amount 1000 --generator ws://localhost:8080 +``` + +### Example + +``` +thor --amount 5000 ws://localhost:8080 +``` + +This will hit the WebSocket server that runs on localhost:8080 with 1000 +connections and sends 100 messages over each established connection. Once `thor` +is done with smashing your connections it will generate a detailed report: + +``` +Thor: version: 1.1.2 + +God of Thunder, son of Odin and smasher of WebSockets! + +Thou shall: +- Spawn 4 workers. +- Create all the concurrent connections. +- Smash 5000 connections . + +Connecting to ws://localhost:8080 + + ◜ Progress :: Created 0, Active 0, @2015-11-11 17:31:15 ==> this will hide after process end + +Online 96728 milliseconds +Time taken 96728 milliseconds +Connected 2211 +Disconnected 0 +Failed 2789 +Total transferred 658.86kB +Total received 631.64kB + +Durations (ms): + + min mean stddev median max +Handshaking 171 3354 1631 2915 7737 +Latency NaN NaN NaN NaN NaN + +Percentile (ms): + + 50% 66% 75% 80% 90% 95% 98% 98% 100% +Handshaking 2915 3627 4705 4827 5811 6412 7686 7712 7737 +Latency NaN NaN NaN NaN NaN NaN NaN NaN NaN + +Received errors: + +2789x undefined +``` + +### Example + +``` +socketio --amount 5000 --RT 300 --SI 30 http://localhost:8080/ +``` + +``` +Thor: version: 1.1.2 + +God of Thunder, son of Odin and smasher of WebSockets! + +Thou shall: +- Spawn 4 workers. +- Create all the concurrent connections. +- Smash 5 connections. + +Connecting to http://localhost:8080/ + + + +Online 137539 milliseconds +Time taken 137539 milliseconds +Connected 5000 +Disconnected 0 +Failed 0 +Total transferred 14.59kB +Total received 14.68kB + +Durations (ms): + + min mean stddev median max +Handshaking 598 614 13 610 638 +Latency NaN NaN NaN NaN NaN + +Percentile (ms): + + 50% 66% 75% 80% 90% 95% 98% 98% 100% +Handshaking 610 617 617 638 638 638 638 638 638 +Latency NaN NaN NaN NaN NaN NaN NaN NaN NaN +``` + +### License + +MIT diff --git a/bin/socketio b/bin/socketio new file mode 100755 index 0000000..e36f537 --- /dev/null +++ b/bin/socketio @@ -0,0 +1,287 @@ +#!/usr/bin/env node +'use strict'; + +var Metrics = require('../metrics') + , colors = require('colors') + , async = require('async') + , path = require('path') + , os = require('os'); + +// +// Setup the Command-Line Interface. +// +var cli = require('commander'); + +cli.usage(['[options] urls', + ,' urls like' + ,' http://localhost:8080/' + ,' http://localhost:8080/?params' + ,' http://localhost:8080/?params@@192.168.102.53'].join("\n")) + .option('-A, --amount ', 'the amount of persistent connections to generate, default 10000', parseInt, 10000) + .option('-C, --concurrent [connections]', '[deprecated]how many concurrent-connections per second, default 0', parseInt, 0) + .option('-M, --messages [messages]', 'number of messages to be send per connection, default 0', parseInt, 0) + .option('-P, --protocol [protocol]', 'WebSocket protocol version, default 13', parseInt, 13) + .option('-B, --buffer [size]', 'size of the messages that are send, default 1024', parseInt, 1024) + .option('-W, --workers [cpus]', 'workers to be spawned, default cpus.length', parseInt, os.cpus().length) + .option('-G, --generator [file]', 'custom message generators') + .option('-M, --masked', 'send the messaged with a mask') + .option('-b, --binary', 'send binary messages instead of utf-8') + .option('-t, --connectTimeout [connectTimeout]', 'default 5(s)', parseInt, 5) + .option('-l, --logError [logError]', 'default 0 means false, otherwise true', parseInt, 0) + .option('--TP, --transport [transport]', '"polling"/"websocket" default websocket', 'websocket') + .option('--PI, --pingInterval [seconds]', 'seconds for doing ping to keep-alive, default 50', parseInt, 50) + .option('--SI, --statInterval [seconds]', 'show stat info interval, default 60', parseInt, 60) + .option('--RT, --runtime [seconds]', 'timeout to close socket(seconds), default to unlimited, u must stop by ctrl+c', parseInt, -1) + .version(require('../package.json').version) + .parse(process.argv); + + +// +// Check if all required arguments are supplied, if we don't have a valid url we +// should bail out +// +if (!cli.args.length) return [ + '愚かな人類よ! You forgot to supply the urls. Type -h for help.' +].forEach(function stderr(line) { + console.error(line); +}); + +// +// By Odin's beard, unleash thunder! +// +var cluster = require('cluster') + , workers = cli.workers || 1 + , ids = Object.create(null) + , concurrents = Object.create(null) + , concurrent_timestamps = Object.create(null) + , connections = 0 + , close_connections = 0 + , error_connections = 0 + , received = 0 + , robin = [] + , workers = Math.min(cli.amount * cli.args.length, workers) + , forked_workers = workers; + +cluster.setupMaster({ + exec: path.resolve(__dirname, '../worker-socketio.js') + , silent: false + , args: [ + cli.generator + ? path.resolve(process.cwd(), cli.generator) + : path.resolve(__dirname, '../generator.js'), + cli.protocol, + !!cli.masked, + !!cli.binary, + cli.statInterval, + cli.logError + ] +}); + +while (workers--) cluster.fork(); + +Object.keys(cluster.workers).forEach(function each(id) { + var worker = cluster.workers[id]; + /** + * message from worker via IPC + * @param {Array|Object} data) Array of Object or Object{id:connection_id, wid:worker_id, ...} + * @return null + */ + worker.on('message', function message(data) { + + var datas = []; + if (data.collection) { + datas = data.datas; + }else{ + datas = [data]; + } + for (var i = 0; i < datas.length; i++) { + var data = datas[i]; + if ('concurrent' in data && 'cur_time' in data) { + if (!concurrent_timestamps[data.wid] || concurrent_timestamps[data.wid] < data.cur_time) { + concurrents[data.wid] = data.concurrent; + } + } + + switch (data.type) { + case 'open': + metrics.handshaken(data); + worker.emit('open::'+ data.id); + + // Output the connection progress + ++connections; + break; + + case 'close': + delete ids[data.id]; + + metrics.close(data); + ++close_connections; + break; + + case 'error': + delete ids[data.id]; + + metrics.error(data); + ++error_connections; + break; + + case 'message': + received++; + metrics.message(data); + } + + } + + // + // Check if we have processed all connections so we can quit cleanly. + // + if (!Object.keys(ids).length) process.exit(); + }); + + // Add our worker to our round robin queue so we can balance all our requests + // across the different workers that we spawned. + robin.push(worker); +}); + +// +// Output live, real-time stats. +// +function live() { + var frames = live.frames + , len = frames.length + , interval = cli.statInterval * 1000 + , i = 0; + + live.interval = setInterval(function tick() { + var active = Object.keys(concurrents).reduce(function (count, id) { + return count + (concurrents[id] || 0); + }, 0); + + // process.stdout.write('\r'+ frames[i++ % len] +' Progress :: '.white + [ + console.log('\r'+ frames[i++ % len] +' Progress :: '.white + [ + 'Created '.white + connections.toString().green, + 'Active '.white + active.toString().green, + 'Error '.white + error_connections.toString().green, + 'Received '.white + received.toString().green, + '@'.white + new Date().toLocaleString().green + ].join(', ')); + }, interval); +} + +/** + * Live frames. + * + * @type {Array} + * @api private + */ +live.frames = [ + ' \u001b[96m◜ \u001b[90m' + , ' \u001b[96m◠ \u001b[90m' + , ' \u001b[96m◝ \u001b[90m' + , ' \u001b[96m◞ \u001b[90m' + , ' \u001b[96m◡ \u001b[90m' + , ' \u001b[96m◟ \u001b[90m' +]; + +/** + * Stop the live stats from running. + * + * @api private + */ +live.stop = function stop() { + // process.stdout.write('\u001b[2K'); + console.log(''); + clearInterval(live.interval); +}; + +// +// Up our WebSocket socket connections. +// +[ + '' + , 'Thor: version: '+ cli._version + , '' + , 'God of Thunder, son of Odin and smasher of WebSockets!' + , '' + , 'Thou shall:' + , '- Spawn '+ forked_workers +' workers.' + , '- Create '+ (cli.concurrent || 'all the') + ' concurrent connections.' + , '- Smash '+ (cli.amount || 'infinite') +' connections.' + , '' +].forEach(function stdout(line) { + console.log(line); +}); + +// +// Metrics collection. +// +var metrics = new Metrics(cli.amount * cli.args.length); + +// Iterate over all the urls so we can target multiple locations at once, which +// is helpfull if you are testing multiple loadbalancer endpoints for example. +async.forEach(cli.args, function forEach(url, done) { + var i = cli.amount + , completed = 0 + , goOnTaskQueueWhenConcurrentLimited = (cli.concurrent && cli.concurrent < cli.amount); + + console.log('Connecting to %s', url); + + url = url.split('@@'); + var localaddr = url.length > 1 ? url[1] : null; + url = url[0]; + // + // Create a simple WebSocket connection generator. + // + var queue = async.queue(function working(id, fn) { + var worker = robin.shift(); + + // Register the id, so we can keep track of the connections that we still + // need to process. + ids[id] = 1; + + // Process the connections + worker.send({ url: url, size: cli.buffer, messages: cli.messages, id: id + , localaddr: localaddr + , transport: cli.transport.split(',') + , pingInterval: cli.pingInterval + , nextTask: goOnTaskQueueWhenConcurrentLimited + , runtime: cli.runtime + , connectTimeout: cli.connectTimeout + }); + + // do it if cuncurrent is not 0 and smaller than the amount of all tcp connections + if (goOnTaskQueueWhenConcurrentLimited) { + worker.once('open::'+ id, fn); + }; + + // Add the worker back at the end of the round robin queue. + robin.push(worker); + }, cli.concurrent || Infinity); + + // When all the events are processed successfully we should call.. back ;P + queue.drain = done; + + // Add all connections to the processing queue; + while (i--) queue.push(url + (localaddr ? '::'+localaddr : '') +'::'+ i); +}, function established(err) { + metrics.established(); +}); + +// +// We are setup, everything is running +// +console.log(''); +// live(); +setTimeout(live, 1000); + +process.once('SIGINT', function end() { + robin.forEach(function nuke(worker) { + try { worker.send({ shutdown: true }); } + catch (e) {} + }); +}); + +process.once('exit', function summary() { + live.stop(); + metrics.established().stop().summary(); +}); diff --git a/bin/thor b/bin/thor index 9db08f2..3517991 100755 --- a/bin/thor +++ b/bin/thor @@ -12,26 +12,34 @@ var Metrics = require('../metrics') // var cli = require('commander'); -cli.usage('[options] ws://localhost') - .option('-A, --amount ', 'the amount of persistent connections to generate', parseInt, 10000) - .option('-C, --concurrent ', 'how many concurrent-connections per second', parseInt, 0) - .option('-M, --messages ', 'messages to be send per connection', parseInt, 1) - .option('-P, --protocol ', 'WebSocket protocol version', parseInt, 13) - .option('-B, --buffer ', 'size of the messages that are send', parseInt, 1024) - .option('-W, --workers ', 'workers to be spawned', parseInt, os.cpus().length) - .option('-G, --generator ', 'custom message generators') +cli.usage(['[options] urls', + ,' urls like' + ,' ws://localhost:8080/?params' + ,' ws://localhost:8080/?params ws://localhost:8080/socket.io/?transport=websocket' + ,' ws://localhost:8080/?params@@192.168.102.33 ws://localhost:8080/socket.io/?transport=websocket@@192.168.102.53'].join("\n")) + .option('-A, --amount ', 'the amount of persistent connections to generate, default 10000', parseInt, 10000) + .option('-C, --concurrent [connections]', '[deprecated]how many concurrent-connections per second, default 0', parseInt, 0) + .option('-M, --messages [messages]', 'number of messages to be send per connection, default 0', parseInt, 0) + .option('-P, --protocol [protocol]', 'WebSocket protocol version, default 13', parseInt, 13) + .option('-B, --buffer [size]', 'size of the messages that are send, default 1024', parseInt, 1024) + .option('-W, --workers [cpus]', 'workers to be spawned, default cpus.length', parseInt, os.cpus().length) + .option('-G, --generator [file]', 'custom message generators') .option('-M, --masked', 'send the messaged with a mask') .option('-b, --binary', 'send binary messages instead of utf-8') + .option('--SE, --serverEngine [engine]', '"socket.io"/"engine.io"(nodejs), "netty-socketio"(java) must be specified if ur using these engine in ur server') + .option('--PI, --pingInterval [seconds]', 'seconds for doing ping to keep-alive, default 50', parseInt, 50) + .option('--SI, --statInterval [seconds]', 'show stat info interval, default 60', parseInt, 60) + .option('--RT, --runtime [seconds]', 'timeout to close socket(seconds), default to unlimited, u must stop by ctrl+c', parseInt, -1) .version(require('../package.json').version) .parse(process.argv); + // // Check if all required arguments are supplied, if we don't have a valid url we // should bail out // if (!cli.args.length) return [ - 'Thor:' - , 'Odin is disappointed in you... pity human! You forgot to supply the urls.' + '愚かな人類よ! You forgot to supply the urls. Type -h for help.' ].forEach(function stderr(line) { console.error(line); }); @@ -45,7 +53,9 @@ var cluster = require('cluster') , concurrents = Object.create(null) , connections = 0 , received = 0 - , robin = []; + , robin = [] + , workers = Math.min(cli.amount * cli.args.length, workers) + , forked_workers = workers; cluster.setupMaster({ exec: path.resolve(__dirname, '../mjolnir.js') @@ -56,7 +66,8 @@ cluster.setupMaster({ : path.resolve(__dirname, '../generator.js'), cli.protocol, !!cli.masked, - !!cli.binary + !!cli.binary, + cli.statInterval ] }); @@ -66,7 +77,16 @@ Object.keys(cluster.workers).forEach(function each(id) { var worker = cluster.workers[id]; worker.on('message', function message(data) { - if ('concurrent' in data) concurrents[data.id] = data.concurrent; + + var datas = []; + if (data.collection) { + datas = data.datas; + }else{ + datas = [data]; + } + for (var i = 0; i < datas.length; i++) { + var data = datas[i]; + if ('concurrent' in data) concurrents[data.workerid] = data.concurrent; switch (data.type) { case 'open': @@ -94,6 +114,8 @@ Object.keys(cluster.workers).forEach(function each(id) { metrics.message(data); } + } + // // Check if we have processed all connections so we can quit cleanly. // @@ -111,7 +133,7 @@ Object.keys(cluster.workers).forEach(function each(id) { function live() { var frames = live.frames , len = frames.length - , interval = 100 + , interval = cli.statInterval * 1000 , i = 0; live.interval = setInterval(function tick() { @@ -121,7 +143,8 @@ function live() { process.stdout.write('\r'+ frames[i++ % len] +' Progress :: '.white + [ 'Created '.white + connections.toString().green, - 'Active '.white + active.toString().green + 'Active '.white + active.toString().green, + '@'.white + new Date().toLocaleString().green ].join(', ')); }, interval); } @@ -161,11 +184,9 @@ live.stop = function stop() { , 'God of Thunder, son of Odin and smasher of WebSockets!' , '' , 'Thou shall:' - , '- Spawn '+ cli.workers +' workers.' - , '- Create '+ (cli.concurrent || 'all the') + ' concurrent/parallel connections.' - , '- Smash '+ (cli.amount || 'infinite') +' connections with the mighty Mjölnir.' - , '' - , 'The answers you seek shall be yours, once I claim what is mine.' + , '- Spawn '+ forked_workers +' workers.' + , '- Create '+ (cli.concurrent || 'all the') + ' concurrent connections.' + , '- Smash '+ (cli.amount || 'infinite') +' connections.' , '' ].forEach(function stdout(line) { console.log(line); @@ -180,10 +201,14 @@ var metrics = new Metrics(cli.amount * cli.args.length); // is helpfull if you are testing multiple loadbalancer endpoints for example. async.forEach(cli.args, function forEach(url, done) { var i = cli.amount - , completed = 0; + , completed = 0 + , goOnTaskQueueWhenConcurrentLimited = (cli.concurrent && cli.concurrent < cli.amount); console.log('Connecting to %s', url); + url = url.split('@@'); + var localaddr = url.length > 1 ? url[1] : null; + url = url[0]; // // Create a simple WebSocket connection generator. // @@ -195,8 +220,18 @@ async.forEach(cli.args, function forEach(url, done) { ids[id] = 1; // Process the connections - worker.send({ url: url, size: cli.buffer, messages: cli.messages, id: id }); - worker.once('open::'+ id, fn); + worker.send({ url: url, size: cli.buffer, messages: cli.messages, id: id + , localaddr: localaddr + , serverEngine: cli.serverEngine + , pingInterval: cli.pingInterval + , nextTask: goOnTaskQueueWhenConcurrentLimited + , runtime: cli.runtime + }); + + // do it if cuncurrent is not 0 and smaller than the amount of all tcp connections + if (goOnTaskQueueWhenConcurrentLimited) { + worker.once('open::'+ id, fn); + }; // Add the worker back at the end of the round robin queue. robin.push(worker); @@ -206,7 +241,7 @@ async.forEach(cli.args, function forEach(url, done) { queue.drain = done; // Add all connections to the processing queue; - while (i--) queue.push(url +'::'+ i); + while (i--) queue.push(url + (localaddr ? '::'+localaddr : '') +'::'+ i); }, function established(err) { metrics.established(); }); diff --git a/generator.js b/generator.js index b51819c..97ca9fc 100644 --- a/generator.js +++ b/generator.js @@ -1,46 +1,46 @@ -'use strict'; - -/** - * Generate a UTF-8 messages that we will be send to a connected client. - * - * @async - * @param {Number} size The specified in bytes for the message. - * @param {Function} fn The callback function for the data. - * @public - */ -exports.utf8 = function utf(size, fn) { - var key = 'utf8::'+ size - , cached = cache[key]; - - // We have a cached version of this size, return that instead. - if (cached) return fn(undefined, cached); - - cached = cache[key] = new Buffer(size).toString('utf-8'); - fn(undefined, cached); -}; - -/** - * Generate a binary message that we will be send to a connected client. - * - * @async - * @param {Number} size The specified in bytes for the message. - * @param {Function} fn The callback function for the data. - * @public - */ -exports.binary = function binary(size, fn) { - var key = 'binary::'+ size - , cached = cache[key]; - - // We have a cached version of this size, return that instead. - if (cached) return fn(undefined, cached); - - cached = cache[key] = new Buffer(size); - fn(undefined, cached); -}; - -// -// The following is not needed to create a session file. We don't want to -// re-create & re-allocate memory every time we receive a message so we cache -// them in a variable. -// -var cache = Object.create(null); +'use strict'; + +/** + * Generate a UTF-8 messages that we will be send to a connected client. + * + * @async + * @param {Number} size The specified in bytes for the message. + * @param {Function} fn The callback function for the data. + * @public + */ +exports.utf8 = function utf(size, fn) { + var key = 'utf8::'+ size + , cached = cache[key]; + + // We have a cached version of this size, return that instead. + if (cached) return fn(undefined, cached); + + cached = cache[key] = new Buffer(size).toString('utf-8'); + fn(undefined, cached); +}; + +/** + * Generate a binary message that we will be send to a connected client. + * + * @async + * @param {Number} size The specified in bytes for the message. + * @param {Function} fn The callback function for the data. + * @public + */ +exports.binary = function binary(size, fn) { + var key = 'binary::'+ size + , cached = cache[key]; + + // We have a cached version of this size, return that instead. + if (cached) return fn(undefined, cached); + + cached = cache[key] = new Buffer(size); + fn(undefined, cached); +}; + +// +// The following is not needed to create a session file. We don't want to +// re-create & re-allocate memory every time we receive a message so we cache +// them in a variable. +// +var cache = Object.create(null); diff --git a/metrics.js b/metrics.js index f0d5c0d..7fef6a5 100644 --- a/metrics.js +++ b/metrics.js @@ -1,260 +1,260 @@ -'use strict'; - -var Stats = require('fast-stats').Stats - , colors = require('colors') - , sugar = require('sugar') - , table = require('tab'); - -/** - * Metrics collection and generation. - * - * @constructor - * @param {Number} requests The total amount of requests scheduled to be send - */ -function Metrics(requests) { - this.requests = requests; // The total amount of requests send - - this.connections = 0; // Connections established - this.disconnects = 0; // Closed connections - this.failures = 0; // Connections that received an error - - this.errors = Object.create(null); // Collection of different errors - this.timing = Object.create(null); // Different timings - - this.latency = new Stats(); // Latencies of the echo'd messages - this.handshaking = new Stats(); // Handshake duration - - this.read = 0; // Bytes read - this.send = 0; // Bytes send - - // Start tracking - this.start(); -} - -/** - * The metrics has started collecting. - * - * @api public - */ -Metrics.prototype.start = function start() { - this.timing.start = Date.now(); - return this; -}; - -/** - * The metrics has stopped collecting. - * - * @api public - */ -Metrics.prototype.stop = function stop() { - if (this.timing.stop) return this; - - this.timing.stop = Date.now(); - this.timing.duration = this.timing.stop - this.timing.start; - return this; -}; - -/** - * All the connections are established - * - * @api public - */ -Metrics.prototype.established = function established() { - if (this.timing.established) return this; - - this.timing.ready = Date.now(); - this.timing.established = this.timing.ready - this.timing.start; - return this; -}; - -/** - * Log an new error. - * - * @param {Object} data The error - * @api public - */ -Metrics.prototype.error = function error(data) { - this.failures++; - - var collection = this.errors[data.message]; - if (!collection) this.errors[data.message] = 1; - else this.errors[data.message]++; - - return this; -}; - -/** - * Register a message resposne. - * - * @param {Object} data The message details. - * @api public - */ -Metrics.prototype.message = function message(data) { - this.latency.push(data.latency); - - return this; -}; - -/** - * Register a successful handshake + open. - * - * @param {Object} data Handshake details. - * @api public - */ -Metrics.prototype.handshaken = function handshaken(data) { - this.connections++; - this.handshaking.push(data.duration); - - return this; -}; - -/** - * The connection has closed. - * - * @param {Object} data Close information - * @api public - */ -Metrics.prototype.close = function close(data) { - this.disconnections++; - this.read += data.read; - this.send += data.send; - - return this; -}; - -/** - * Generate a summary of the metrics. - * - * @returns {Object} The summary - * @api public - */ -Metrics.prototype.summary = function summary() { - var results = new table.TableOutputStream({ columns: [ - { label: '', width: 20 }, - { label: '' } - ]}); - - console.log(); - results.writeRow(['Online', this.timing.established + ' milliseconds']); - results.writeRow(['Time taken', this.timing.duration + ' milliseconds']); - results.writeRow(['Connected', this.connections]); - results.writeRow(['Disconnected', this.disconnects]); - results.writeRow(['Failed', this.failures]); - - results.writeRow(['Total transferred', this.send.bytes(2)]); - results.writeRow(['Total received', this.read.bytes(2)]); - - // Up next is outputting the series. - var handshaking = this.handshaking - , latency = this.latency - , hrange = handshaking.range() - , lrange = latency.range(); - - // - // Generate the width of the columns, based on the length of the longest - // number. If it's less then the max size of a label, we default to that. - // After that we also pad the strings with 1 char for extra spacing. - // - var width = (lrange[1] > hrange[1] ? lrange[1] : hrange[1]).toString().length; - if (width < 6) width = 6; - width++; - - console.log(); - console.log('Durations (ms):'); - console.log(); - - table.emitTable({ - columns: [ - { label: '', width: 20 }, - { label: 'min', width: width, align: 'left' }, - { label: 'mean', width: width, align: 'left' }, - { label: 'stddev', width: width, align: 'right' }, - { label: 'median', width: width, align: 'right' }, - { label: 'max', width: width, align: 'left' } - ], - rows: [ - [ - 'Handshaking', - hrange[0].toFixed(), - handshaking.amean().toFixed(), - handshaking.stddev().toFixed(), - handshaking.median().toFixed(), - hrange[1].toFixed() - ], - [ - 'Latency', - lrange[0].toFixed(), - latency.amean().toFixed(), - latency.stddev().toFixed(), - latency.median().toFixed(), - lrange[1].toFixed() - ] - ] - }); - - console.log(); - console.log('Percentile (ms):'); - console.log(); - - table.emitTable({ - columns: [ - { label: '', width: 20 }, - { label: ' 50%', width: width }, - { label: ' 66%', width: width }, - { label: ' 75%', width: width }, - { label: ' 80%', width: width }, - { label: ' 90%', width: width }, - { label: ' 95%', width: width }, - { label: ' 98%', width: width }, - { label: ' 98%', width: width }, - { label: '100%', width: width }, - ], - rows: [ - [ - 'Handshaking', - handshaking.percentile(50).toFixed(), - handshaking.percentile(66).toFixed(), - handshaking.percentile(75).toFixed(), - handshaking.percentile(80).toFixed(), - handshaking.percentile(90).toFixed(), - handshaking.percentile(95).toFixed(), - handshaking.percentile(98).toFixed(), - handshaking.percentile(99).toFixed(), - handshaking.percentile(100).toFixed() - ], - [ - 'Latency', - latency.percentile(50).toFixed(), - latency.percentile(66).toFixed(), - latency.percentile(75).toFixed(), - latency.percentile(80).toFixed(), - latency.percentile(90).toFixed(), - latency.percentile(95).toFixed(), - latency.percentile(98).toFixed(), - latency.percentile(99).toFixed(), - latency.percentile(100).toFixed() - ] - ] - }); - - // - // Output more error information, there could be multiple causes on why we - // failed to send a message. - // - if (this.failures) { - console.log(); - console.log('Received errors:'); - console.log(); - - Object.keys(this.errors).forEach(function error(err) { - results.writeRow([this.errors[err] +'x', err]); - }, this); - } - - return this; -}; - -// -// Expose the metrics constructor. -// -module.exports = Metrics; +'use strict'; + +var Stats = require('fast-stats').Stats + , colors = require('colors') + , sugar = require('sugar') + , table = require('tab'); + +/** + * Metrics collection and generation. + * + * @constructor + * @param {Number} requests The total amount of requests scheduled to be send + */ +function Metrics(requests) { + this.requests = requests; // The total amount of requests send + + this.connections = 0; // Connections established + this.disconnects = 0; // Closed connections + this.failures = 0; // Connections that received an error + + this.errors = Object.create(null); // Collection of different errors + this.timing = Object.create(null); // Different timings + + this.latency = new Stats(); // Latencies of the echo'd messages + this.handshaking = new Stats(); // Handshake duration + + this.read = 0; // Bytes read + this.send = 0; // Bytes send + + // Start tracking + this.start(); +} + +/** + * The metrics has started collecting. + * + * @api public + */ +Metrics.prototype.start = function start() { + this.timing.start = Date.now(); + return this; +}; + +/** + * The metrics has stopped collecting. + * + * @api public + */ +Metrics.prototype.stop = function stop() { + if (this.timing.stop) return this; + + this.timing.stop = Date.now(); + this.timing.duration = this.timing.stop - this.timing.start; + return this; +}; + +/** + * All the connections are established + * + * @api public + */ +Metrics.prototype.established = function established() { + if (this.timing.established) return this; + + this.timing.ready = Date.now(); + this.timing.established = this.timing.ready - this.timing.start; + return this; +}; + +/** + * Log an new error. + * + * @param {Object} data The error + * @api public + */ +Metrics.prototype.error = function error(data) { + this.failures++; + + var collection = this.errors[data.message]; + if (!collection) this.errors[data.message] = 1; + else this.errors[data.message]++; + + return this; +}; + +/** + * Register a message resposne. + * + * @param {Object} data The message details. + * @api public + */ +Metrics.prototype.message = function message(data) { + this.latency.push(data.latency); + + return this; +}; + +/** + * Register a successful handshake + open. + * + * @param {Object} data Handshake details. + * @api public + */ +Metrics.prototype.handshaken = function handshaken(data) { + this.connections++; + this.handshaking.push(data.duration); + + return this; +}; + +/** + * The connection has closed. + * + * @param {Object} data Close information + * @api public + */ +Metrics.prototype.close = function close(data) { + this.disconnections++; + this.read += data.read; + this.send += data.send; + + return this; +}; + +/** + * Generate a summary of the metrics. + * + * @returns {Object} The summary + * @api public + */ +Metrics.prototype.summary = function summary() { + var results = new table.TableOutputStream({ columns: [ + { label: '', width: 20 }, + { label: '' } + ]}); + + console.log(); + results.writeRow(['Online', this.timing.established + ' milliseconds']); + results.writeRow(['Time taken', this.timing.duration + ' milliseconds']); + results.writeRow(['Connected', this.connections]); + results.writeRow(['Disconnected', this.disconnects]); + results.writeRow(['Failed', this.failures]); + + results.writeRow(['Total transferred', this.send.bytes(2)]); + results.writeRow(['Total received', this.read.bytes(2)]); + + // Up next is outputting the series. + var handshaking = this.handshaking + , latency = this.latency + , hrange = handshaking.range() + , lrange = latency.range(); + + // + // Generate the width of the columns, based on the length of the longest + // number. If it's less then the max size of a label, we default to that. + // After that we also pad the strings with 1 char for extra spacing. + // + var width = (lrange[1] > hrange[1] ? lrange[1] : hrange[1]).toString().length; + if (width < 6) width = 6; + width++; + + console.log(); + console.log('Durations (ms):'); + console.log(); + + table.emitTable({ + columns: [ + { label: '', width: 20 }, + { label: 'min', width: width, align: 'left' }, + { label: 'mean', width: width, align: 'left' }, + { label: 'stddev', width: width, align: 'right' }, + { label: 'median', width: width, align: 'right' }, + { label: 'max', width: width, align: 'left' } + ], + rows: [ + [ + 'Handshaking', + hrange[0].toFixed(), + handshaking.amean().toFixed(), + handshaking.stddev().toFixed(), + handshaking.median().toFixed(), + hrange[1].toFixed() + ], + [ + 'Latency', + lrange[0].toFixed(), + latency.amean().toFixed(), + latency.stddev().toFixed(), + latency.median().toFixed(), + lrange[1].toFixed() + ] + ] + }); + + console.log(); + console.log('Percentile (ms):'); + console.log(); + + table.emitTable({ + columns: [ + { label: '', width: 20 }, + { label: ' 50%', width: width }, + { label: ' 66%', width: width }, + { label: ' 75%', width: width }, + { label: ' 80%', width: width }, + { label: ' 90%', width: width }, + { label: ' 95%', width: width }, + { label: ' 98%', width: width }, + { label: ' 98%', width: width }, + { label: '100%', width: width }, + ], + rows: [ + [ + 'Handshaking', + handshaking.percentile(50).toFixed(), + handshaking.percentile(66).toFixed(), + handshaking.percentile(75).toFixed(), + handshaking.percentile(80).toFixed(), + handshaking.percentile(90).toFixed(), + handshaking.percentile(95).toFixed(), + handshaking.percentile(98).toFixed(), + handshaking.percentile(99).toFixed(), + handshaking.percentile(100).toFixed() + ], + [ + 'Latency', + latency.percentile(50).toFixed(), + latency.percentile(66).toFixed(), + latency.percentile(75).toFixed(), + latency.percentile(80).toFixed(), + latency.percentile(90).toFixed(), + latency.percentile(95).toFixed(), + latency.percentile(98).toFixed(), + latency.percentile(99).toFixed(), + latency.percentile(100).toFixed() + ] + ] + }); + + // + // Output more error information, there could be multiple causes on why we + // failed to send a message. + // + if (this.failures) { + console.log(); + console.log('Received errors:'); + console.log(); + + Object.keys(this.errors).forEach(function error(err) { + results.writeRow([this.errors[err] +'x', err]); + }, this); + } + + return this; +}; + +// +// Expose the metrics constructor. +// +module.exports = Metrics; diff --git a/mjolnir.js b/mjolnir.js index c22f517..c282c2f 100644 --- a/mjolnir.js +++ b/mjolnir.js @@ -1,120 +1,207 @@ -'use strict'; - -var Socket = require('ws') - , connections = {} - , concurrent = 0; - -// -// Get the session document that is used to generate the data. -// -var session = require(process.argv[2]); - -// -// WebSocket connection details. -// -var masked = process.argv[4] === 'true' - , binary = process.argv[5] === 'true' - , protocol = +process.argv[3] || 13; - -process.on('message', function message(task) { - var now = Date.now(); - - // - // Write a new message to the socket. The message should have a size of x - // - if ('write' in task) { - Object.keys(connections).forEach(function write(id) { - write(connections[id], task, id); - }); - } - - // - // Shut down every single socket. - // - if (task.shutdown) { - Object.keys(connections).forEach(function shutdown(id) { - connections[id].close(); - }); - } - - // End of the line, we are gonna start generating new connections. - if (!task.url) return; - - var socket = new Socket(task.url, { - protocolVersion: protocol - }); - - socket.on('open', function open() { - process.send({ type: 'open', duration: Date.now() - now, id: task.id, concurrent: concurrent }); - write(socket, task, task.id); - - // As the `close` event is fired after the internal `_socket` is cleaned up - // we need to do some hacky shit in order to tack the bytes send. - }); - - socket.on('message', function message(data) { - process.send({ - type: 'message', latency: Date.now() - socket.last, concurrent: concurrent, - id: task.id - }); - - // Only write as long as we are allowed to send messages - if (--task.messages) { - write(socket, task, task.id); - } else { - socket.close(); - } - }); - - socket.on('close', function close() { - var internal = socket._socket || {}; - - process.send({ - type: 'close', id: task.id, concurrent: --concurrent, - read: internal.bytesRead || 0, - send: internal.bytesWritten || 0 - }); - - delete connections[task.id]; - }); - - socket.on('error', function error(err) { - process.send({ type: 'error', message: err.message, id: task.id, concurrent: --concurrent }); - - socket.close(); - delete connections[task.id]; - }); - - // Adding a new socket to our socket collection. - ++concurrent; - connections[task.id] = socket; -}); - -/** - * Helper function from writing messages to the socket. - * - * @param {WebSocket} socket WebSocket connection we should write to - * @param {Object} task The given task - * @param {String} id - * @param {Function} fn The callback - * @api private - */ -function write(socket, task, id, fn) { - session[binary ? 'binary' : 'utf8'](task.size, function message(err, data) { - var start = socket.last = Date.now(); - - socket.send(data, { - binary: binary, - mask: masked - }, function sending(err) { - if (err) { - process.send({ type: 'error', message: err.message, concurrent: --concurrent, id: id }); - - socket.close(); - delete connections[id]; - } - - if (fn) fn(err); - }); - }); -} +'use strict'; + +var Socket = require('ws') + , connections = {} + , concurrent = 0; + +// +// Get the session document that is used to generate the data. +// +var session = require(process.argv[2]); + +// +// WebSocket connection details. +// +var masked = process.argv[4] === 'true' + , binary = process.argv[5] === 'true' + , protocol = +process.argv[3] || 13; + +// collect metics datas +var metrics_datas = {collection:true, datas:[]} + , statInterval = +process.argv[6] || 60 + , process_send = function(data, task) { + if (statInterval <= 0 || ('open' == data.type && task.nextTask)) { + process.send(data); + }else{ + metrics_datas.datas.push(data); + } + } + , process_sendAll = function(end) { + if (metrics_datas.datas.length <= 0) { + return; + }; + // send all data to parent + process.send(metrics_datas, null, function clearDatas(err){ + // invoked after the message is sent but before the target may have received it + if (err) {return;} + // WARNING: maybe we should use synchronize method here + metrics_datas.datas = []; + if (end) { + process.exit(); + }; + }); + } + , checkConnectionLength = function(){ + if (Object.keys(connections).length <= 0) { + process_sendAll(true); + } + } + , workerStatInterval = setInterval(function () { + process_sendAll(); + }, statInterval * 1000); + +process.on('message', function message(task) { + var now = Date.now(); + + // + // Write a new message to the socket. The message should have a size of x + // + if ('write' in task) { + Object.keys(connections).forEach(function write(id) { + write(connections[id], task, id); + }); + } + + // + // Shut down every single socket. + // + if (task.shutdown) { + Object.keys(connections).forEach(function shutdown(id) { + connections[id] && connections[id].close(); + }); + } + + // End of the line, we are gonna start generating new connections. + if (!task.url) return; + + var socket = new Socket(task.url, { + protocolVersion: protocol, + localAddress: task.localaddr || null + }); + socket.last = Date.now(); + var pingInterval = null; + + socket.on('open', function open() { + process_send({ type: 'open', duration: Date.now() - now, id: task.id, concurrent: concurrent, workerid: process.pid }, task); + // write(socket, task, task.id); + + if (task.pingInterval && task.pingInterval > 0) { + pingInterval = setInterval(function ping(id, socket) { + if(socket){ + if(task.serverEngine && -1 != ['socket.io','engine.io','netty-socketio'].indexOf(task.serverEngine)) { + socket.send('2'); + }else{ + socket.ping(); + } + }else{ + delete connections[task.id]; + clearInterval(pingInterval); + checkConnectionLength(); + } + }, task.pingInterval * 1000, task.id, socket); + } + // As the `close` event is fired after the internal `_socket` is cleaned up + // we need to do some hacky shit in order to tack the bytes send. + }); + + socket.on('message', function message(data) { + process_send({ + type: 'message', latency: Date.now() - socket.last, concurrent: concurrent, workerid: process.pid, + id: task.id + }, task); + + // Only write as long as we are allowed to send messages + if (task.messages > 0) + if (--task.messages) { + write(socket, task, task.id); + } else { + socket.close(); + socket.emit('close'); + } + }); + socket.on('onMessage', function onMessage(data) { + process_send({ + type: 'message', latency: Date.now() - socket.last, concurrent: concurrent, workerid: process.pid, + id: task.id + }, task); + + // Only write as long as we are allowed to send messages + if (task.messages > 0) + if (--task.messages) { + write(socket, task, task.id); + } else { + socket.close(); + socket.emit('close'); + } + }); + + socket.on('close', function close() { + var internal = socket._socket || {}; + + process_send({ + type: 'close', id: task.id, concurrent: --concurrent, workerid: process.pid, + read: internal.bytesRead || 0, + send: internal.bytesWritten || 0 + }, task); + + delete connections[task.id]; + clearInterval(pingInterval); + checkConnectionLength(); + }); + + socket.on('error', function error(err) { + process_send({ type: 'error', message: err.message, id: task.id, concurrent: --concurrent, workerid: process.pid }, task); + + socket.close(); + socket.emit('close'); + delete connections[task.id]; + checkConnectionLength(); + }); + + // Adding a new socket to our socket collection. + ++concurrent; + connections[task.id] = socket; + + // timeout to close socket + if (task.runtime && task.runtime > 0) { + setTimeout(function timeoutToCloseSocket(id, socket) { + socket.close(); + socket.emit('close'); + }, task.runtime * 1000, task.id, socket); + } +}); + +process.on('SIGINT', function () {}); +process.on('exit', function () {}); + +/** + * Helper function from writing messages to the socket. + * + * @param {WebSocket} socket WebSocket connection we should write to + * @param {Object} task The given task + * @param {String} id + * @param {Function} fn The callback + * @param {String} data + * @api private + */ +function write(socket, task, id, fn, data) { + // i thank the generator doesn't make any sense, but just let me do some change and leave it alone + session[binary ? 'binary' : 'utf8'](data || task.size, function message(err, data) { + var start = socket.last = Date.now(); + + socket.send(data, { + binary: binary, + mask: masked + }, function sending(err) { + if (err) { + process_send({ type: 'error', message: err.message, concurrent: --concurrent, workerid: process.pid, id: id }, task); + + socket.close(); + socket.emit('close'); + delete connections[id]; + } + + if (fn) fn(err); + }); + }); +} diff --git a/package.json b/package.json index bdfe97b..1581765 100644 --- a/package.json +++ b/package.json @@ -1,32 +1,35 @@ -{ - "name": "thor", - "version": "1.0.0", - "description": "Thor is WebSocket benchmark utility", - "main": "index.js", - "scripts": { - "test": "echo \"Error: no test specified\" && exit 1" - }, - "repository": { - "type": "git", - "url": "https://github.com/observing/thor.git" - }, - "keywords": [ - "WebSocket", - "benchmark", - "load" - ], - "author": "Arnout Kazemier ", - "license": "MIT", - "dependencies": { - "commander": "1.1.x", - "async": "0.2.x", - "tab": "0.1.x", - "colors": "0.6.x", - "ws": "git://github.com/einaros/ws.git", - "fast-stats": "git://github.com/bluesmoon/node-faststats.git", - "sugar": "1.3.8" - }, - "bin": { - "thor": "./bin/thor" - } -} +{ + "name": "thor", + "version": "1.1.2", + "description": "Thor is WebSocket benchmark utility", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "repository": { + "type": "git", + "url": "https://github.com/iorichina/thor.git" + }, + "keywords": [ + "WebSocket", + "benchmark", + "load" + ], + "author": "Arnout Kazemier ", + "license": "MIT", + "dependencies": { + "engine.io-client": "git://github.com/iorichina/engine.io-client.git#1.5.4", + "socket.io-client": "1.3.7", + "commander": "1.1.x", + "async": "0.2.x", + "tab": "0.1.x", + "colors": "0.6.x", + "ws": "git://github.com/einaros/ws.git", + "fast-stats": "git://github.com/bluesmoon/node-faststats.git", + "sugar": "1.3.8" + }, + "bin": { + "thor": "./bin/thor", + "thor-socketio": "./bin/socketio" + } +} diff --git a/worker-socketio.js b/worker-socketio.js new file mode 100644 index 0000000..9c83797 --- /dev/null +++ b/worker-socketio.js @@ -0,0 +1,203 @@ +'use strict'; + +var Socket = require('socket.io-client') + , connections = {} + , concurrent = 0; + +// +// Get the session document that is used to generate the data. +// +var session = require(process.argv[2]); + +// +// WebSocket connection details. +// +var masked = process.argv[4] === 'true' + , binary = process.argv[5] === 'true' + , protocol = +process.argv[3] || 13; + +// collect metics datas +var metrics_datas = {collection:true, datas:[]} + , statInterval = +process.argv[6] || 60 + , logError = +process.argv[7] || 0 + , process_send = function(data, task) { + if (statInterval <= 0 || ('open' == data.type && task.nextTask)) { + process.send(data); + }else{ + metrics_datas.datas.push(data); + } + } + , process_sendAll = function(end) { + if (metrics_datas.datas.length <= 0) { + return; + }; + // send all data to parent + process.send(metrics_datas, null, function clearDatas(err){ + // invoked after the message is sent but before the target may have received it + if (err) {return;} + // WARNING: maybe we should use synchronize method here + metrics_datas.datas = []; + if (end) { + process.exit(); + }; + }); + } + , checkConnectionLength = function(){ + if (Object.keys(connections).length <= 0) { + process_sendAll(true); + } + } + , workerStatInterval = setInterval(function () { + process_sendAll(); + }, statInterval * 1000); + +process.on('message', function message(task) { + var start_timestamp = Date.now(); + + // + // Write a new message to the socket. The message should have a size of x + // + if ('write' in task) { + Object.keys(connections).forEach(function write(id) { + write(connections[id], task, id); + }); + } + + // + // Shut down every single socket. + // + if (task.shutdown) { + Object.keys(connections).forEach(function shutdown(id) { + connections[id] && connections[id].disconnect(); + }); + } + + // End of the line, we are gonna start generating new connections. + if (!task.url) return; + + var socket = new Socket(task.url, { + 'force new connection': true, + reconnection: false, + timeout: task.connectTimeout * 1000, + transports: task.transport, + protocolVersion: protocol, + localAddress: task.localaddr || null, + headers: {'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.93 Safari/537.36'} + }); + socket.last = Date.now(); + var pingInterval = null; + + socket.on('connect', function open() { + process_send({ type: 'open', duration: Date.now() - start_timestamp, id: task.id, concurrent: concurrent, cur_time: Date.now(), wid: process.pid }, task); + // write(socket, task, task.id); + + // As the `close` event is fired after the internal `_socket` is cleaned up + // we need to do some hacky shit in order to tack the bytes send. + }); + + socket.on(process.env.NODE_ON_MESSAGE?process.env.NODE_ON_MESSAGE:'message', function message(data) { + process_send({ + type: 'message', latency: Date.now() - socket.last, + id: task.id, wid: process.pid + }, task); + + // Only write as long as we are allowed to send messages + if (task.messages > 0) + if (--task.messages) { + write(socket, task, task.id); + } else { + socket.disconnect(); + socket.emit('disconnect'); + } + }); + + socket.on('disconnect', function close(msg) { + var err = msg=='error' && arguments.length > 2 ? Array.prototype.slice.call(arguments, 1, 2).pop() : null; + if (err && logError) { + console.error(err); + } + + var internal = {}; + try{ + internal = socket.io.engine.transport.ws._socket || {}; + }catch(e){ + // console.log(socket.io.engine.transport.pollXhr); + } + + process_send({ + type: 'close', id: task.id, wid: process.pid, concurrent: --concurrent, cur_time: Date.now(), + read: internal.bytesRead || 0, + send: internal.bytesWritten || 0 + }, task); + + delete connections[task.id]; + clearInterval(pingInterval); + checkConnectionLength(); + }); + + socket.on('error', function error(err) { + process_send({ type: 'error', message: err.description ? err.description.message : (err.message?err.message:err), id: task.id, wid: process.pid }, task); + + socket.disconnect('error', err); + socket.emit('disconnect', 'error', err); + delete connections[task.id]; + checkConnectionLength(); + }); + + // catch ECONNREFUSED + socket.io.on('connect_error', function connect_error(err){ + process_send({ type: 'error', message: err.description ? err.description.message : (err.message?err.message:err), id: task.id, wid: process.pid }, task); + + socket.disconnect('error', err); + socket.emit('disconnect', 'error', err); + delete connections[task.id]; + checkConnectionLength(); + }); + + // Adding a new socket to our socket collection. + ++concurrent; + connections[task.id] = socket; + + // timeout to close socket + if (task.runtime && task.runtime > 0) { + setTimeout(function timeoutToCloseSocket(id, socket) { + socket.disconnect(); + socket.emit('disconnect'); + }, task.runtime * 1000, task.id, socket); + } +}); + +process.on('SIGINT', function () {}); +process.on('exit', function () {}); + +/** + * Helper function from writing messages to the socket. + * + * @param {WebSocket} socket WebSocket connection we should write to + * @param {Object} task The given task + * @param {String} id + * @param {Function} fn The callback + * @param {String} data + * @api private + */ +function write(socket, task, id, fn, data) { + // i thank the generator doesn't make any sense, but just let me do some change and leave it alone + session[binary ? 'binary' : 'utf8'](data || task.size, function message(err, data) { + var start = socket.last = Date.now(); + + socket.send(data, { + binary: binary, + mask: masked + }, function sending(err) { + if (err) { + process_send({ type: 'error', message: err.description ? err.description.message : (err.message?err.message:err), id: task.id, wid: process.pid }, task); + + socket.disconnect('error', err); + socket.emit('disconnect', 'error', err); + delete connections[id]; + } + + if (fn) fn(err); + }); + }); +}