From 50b0701b5c8b041e520f9cd397efc5e1a409e09c Mon Sep 17 00:00:00 2001 From: Nathan Peck Date: Mon, 13 Oct 2014 11:02:57 -0400 Subject: [PATCH 1/2] Typo in the README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1f18c79..709101d 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ A pipeable write stream which uploads to Amazon S3 using the multipart file uplo #### 1.0.4 (2014-10-13) -Getting rid of the use of setImmeadiate. Also now the MPU is not initialized until data is actually received by the writable stream, and error checking verifies that data has actually been uploaded to S3 before trying to end the stream. This fixes an issue where empty incoming streams were causing errors to come back from S3 as the module was attempting to complete an empty MPU. +Getting rid of the use of setImmediate. Also now the MPU is not initialized until data is actually received by the writable stream, and error checking verifies that data has actually been uploaded to S3 before trying to end the stream. This fixes an issue where empty incoming streams were causing errors to come back from S3 as the module was attempting to complete an empty MPU. #### 1.0.3 (2014-10-12) From 3a499c0f2ae34df4a9884eee4e6aafdf3e5b062e Mon Sep 17 00:00:00 2001 From: Izaak Schroeder Date: Wed, 15 Oct 2014 20:47:11 -0700 Subject: [PATCH 2/2] Provide machinery to use this framework without globals. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Having global shared state is always a bad thing™. So we allow the creation of individual instances of streaming clients, each with their own associated AWS.S3 instance. The new code looks something like: ```javascript var client = require('s3-upload-stream')(AWS.S3()); var stream = client.upload({ bucket: "...", key: "..." }) ``` Backwards compatibility is also shoehorned so those who are using the library can continue to do so just as they were before. Version bump accordingly. --- lib/s3-upload-stream.js | 489 +++++++++++++++++++++------------------- package.json | 2 +- 2 files changed, 255 insertions(+), 236 deletions(-) diff --git a/lib/s3-upload-stream.js b/lib/s3-upload-stream.js index 8645232..52d5fd4 100644 --- a/lib/s3-upload-stream.js +++ b/lib/s3-upload-stream.js @@ -1,265 +1,284 @@ var Writable = require('stream').Writable, events = require("events"); -var cachedClient; +// Set the S3 client to be used for this upload. +function Client(client) { + if (this instanceof Client === false) { + return new Client(client); + } -module.exports = { - // Set the S3 client to be used for this upload. - client: function (client) { - cachedClient = client; - }, + if (!client) { + throw new Error('Must configure an S3 client before attempting to create an S3 upload stream.'); + } - // Generate a writeable stream which uploads to a file on S3. - upload: function (destinationDetails) { - var e = new events.EventEmitter(); + this.cachedClient = client; +} - // Create the writeable stream interface. - var ws = new Writable({ - highWaterMark: 4194304 // 4 MB - }); +// Generate a writeable stream which uploads to a file on S3. +Client.prototype.upload = function (destinationDetails) { - // Data pertaining to the overall upload - var multipartUploadID; - var partNumber = 1; - var partIds = []; - var receivedSize = 0; - var uploadedSize = 0; - - // Parts which need to be uploaded to S3. - var pendingParts = 0; - var concurrentPartThreshold = 1; - - // Data pertaining to buffers we have received - var receivedBuffers = []; - var receivedBuffersLength = 0; - var partSizeThreshold = 5242880; - - // Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part - // of the multipart upload - ws.maxPartSize = function (partSize) { - if (partSize < 5242880) - partSize = 5242880; - - partSizeThreshold = partSize; - return ws; - }; - - ws.getMaxPartSize = function () { - return partSizeThreshold; - }; - - // Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part - // of the multipart upload - ws.concurrentParts = function (parts) { - if (parts < 1) - parts = 1; - - concurrentPartThreshold = parts; - return ws; - }; - - ws.getConcurrentParts = function () { - return concurrentPartThreshold; - }; - - // Handler to receive data and upload it to S3. - ws._write = function (incomingBuffer, enc, next) { - absorbBuffer(incomingBuffer); - - if (receivedBuffersLength < partSizeThreshold) - return next(); // Ready to receive more data in _write. - - // We need to upload some data - uploadHandler(next); - }; - - // Concurrenly upload parts to S3. - var uploadHandler = function (next) { - if (pendingParts < concurrentPartThreshold) { - // Has the MPU been created yet? - if (multipartUploadID) - upload(); // Upload the part immeadiately. - else { - e.once('ready', upload); // Wait until multipart upload is initialized. - createMultipartUpload(); - } - } + var cachedClient = this.cachedClient; + + var e = new events.EventEmitter(); + + // Create the writeable stream interface. + var ws = new Writable({ + highWaterMark: 4194304 // 4 MB + }); + + // Data pertaining to the overall upload + var multipartUploadID; + var partNumber = 1; + var partIds = []; + var receivedSize = 0; + var uploadedSize = 0; + + // Parts which need to be uploaded to S3. + var pendingParts = 0; + var concurrentPartThreshold = 1; + + // Data pertaining to buffers we have received + var receivedBuffers = []; + var receivedBuffersLength = 0; + var partSizeThreshold = 5242880; + + // Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part + // of the multipart upload + ws.maxPartSize = function (partSize) { + if (partSize < 5242880) + partSize = 5242880; + + partSizeThreshold = partSize; + return ws; + }; + + ws.getMaxPartSize = function () { + return partSizeThreshold; + }; + + // Set the maximum amount of data that we will keep in memory before flushing it to S3 as a part + // of the multipart upload + ws.concurrentParts = function (parts) { + if (parts < 1) + parts = 1; + + concurrentPartThreshold = parts; + return ws; + }; + + ws.getConcurrentParts = function () { + return concurrentPartThreshold; + }; + + // Handler to receive data and upload it to S3. + ws._write = function (incomingBuffer, enc, next) { + absorbBuffer(incomingBuffer); + + if (receivedBuffersLength < partSizeThreshold) + return next(); // Ready to receive more data in _write. + + // We need to upload some data + uploadHandler(next); + }; + + // Concurrenly upload parts to S3. + var uploadHandler = function (next) { + if (pendingParts < concurrentPartThreshold) { + // Has the MPU been created yet? + if (multipartUploadID) + upload(); // Upload the part immeadiately. else { - // Block uploading (and receiving of more data) until we upload - // some of the pending parts - e.once('part', upload); + e.once('ready', upload); // Wait until multipart upload is initialized. + createMultipartUpload(); } + } + else { + // Block uploading (and receiving of more data) until we upload + // some of the pending parts + e.once('part', upload); + } - function upload() { - pendingParts++; - flushPart(function (partDetails) { - --pendingParts; - e.emit('part'); // Internal event - ws.emit('part', partDetails); // External event - }); - next(); - } - }; - - // Absorb an incoming buffer from _write into a buffer queue - var absorbBuffer = function (incomingBuffer) { - receivedBuffers.push(incomingBuffer); - receivedBuffersLength += incomingBuffer.length; - }; - - // Take a list of received buffers and return a combined buffer that is exactly - // partSizeThreshold in size. - var preparePartBuffer = function () { - // Combine the buffers we've received and reset the list of buffers. - var combinedBuffer = Buffer.concat(receivedBuffers, receivedBuffersLength); - receivedBuffers.length = 0; // Trick to reset the array while keeping the original reference - receivedBuffersLength = 0; - - if (combinedBuffer.length > partSizeThreshold) { - // The combined buffer is too big, so slice off the end and put it back in the array. - var remainder = new Buffer(combinedBuffer.length - partSizeThreshold); - combinedBuffer.copy(remainder, 0, partSizeThreshold); - receivedBuffers.push(remainder); - receivedBuffersLength = remainder.length; - - // Return the original buffer. - return combinedBuffer.slice(0, partSizeThreshold); - } - else { - // It just happened to be perfectly sized, so return it. - return combinedBuffer; + function upload() { + pendingParts++; + flushPart(function (partDetails) { + --pendingParts; + e.emit('part'); // Internal event + ws.emit('part', partDetails); // External event + }); + next(); + } + }; + + // Absorb an incoming buffer from _write into a buffer queue + var absorbBuffer = function (incomingBuffer) { + receivedBuffers.push(incomingBuffer); + receivedBuffersLength += incomingBuffer.length; + }; + + // Take a list of received buffers and return a combined buffer that is exactly + // partSizeThreshold in size. + var preparePartBuffer = function () { + // Combine the buffers we've received and reset the list of buffers. + var combinedBuffer = Buffer.concat(receivedBuffers, receivedBuffersLength); + receivedBuffers.length = 0; // Trick to reset the array while keeping the original reference + receivedBuffersLength = 0; + + if (combinedBuffer.length > partSizeThreshold) { + // The combined buffer is too big, so slice off the end and put it back in the array. + var remainder = new Buffer(combinedBuffer.length - partSizeThreshold); + combinedBuffer.copy(remainder, 0, partSizeThreshold); + receivedBuffers.push(remainder); + receivedBuffersLength = remainder.length; + + // Return the original buffer. + return combinedBuffer.slice(0, partSizeThreshold); + } + else { + // It just happened to be perfectly sized, so return it. + return combinedBuffer; + } + }; + + // Flush a part out to S3. + var flushPart = function (callback) { + var partBuffer = preparePartBuffer(); + + var localPartNumber = partNumber; + partNumber++; + receivedSize += partBuffer.length; + cachedClient.uploadPart( + { + Body: partBuffer, + Bucket: destinationDetails.Bucket, + Key: destinationDetails.Key, + UploadId: multipartUploadID, + PartNumber: localPartNumber + }, + function (err, result) { + if (err) + abortUpload('Failed to upload a part to S3: ' + JSON.stringify(err)); + else { + uploadedSize += partBuffer.length; + partIds[localPartNumber - 1] = { + ETag: result.ETag, + PartNumber: localPartNumber + }; + + callback({ + ETag: result.ETag, + PartNumber: localPartNumber, + receivedSize: receivedSize, + uploadedSize: uploadedSize + }); + } } - }; + ); + }; + + // Overwrite the end method so that we can hijack it to flush the last part and then complete + // the multipart upload + ws.originalEnd = ws.end; + ws.end = function (Part, encoding, callback) { + ws.originalEnd(Part, encoding, function afterDoneWithOriginalEnd() { + if (Part) + absorbBuffer(Part); + + // Upload any remaining data + var uploadRemainingData = function () { + if (receivedBuffersLength > 0) { + uploadHandler(uploadRemainingData); + return; + } + + if (pendingParts > 0) { + setTimeout(uploadRemainingData, 50); // Wait 50 ms for the pending uploads to finish before trying again. + return; + } + + completeUpload(); + }; - // Flush a part out to S3. - var flushPart = function (callback) { - var partBuffer = preparePartBuffer(); + uploadRemainingData(); - var localPartNumber = partNumber; - partNumber++; - receivedSize += partBuffer.length; - cachedClient.uploadPart( + if (typeof callback == 'function') + callback(); + }); + }; + + // Turn all the individual parts we uploaded to S3 into a finalized upload. + var completeUpload = function () { + // There is a possibility that the incoming stream was empty, therefore the MPU never started + // and can not be finalized. + if (multipartUploadID) { + cachedClient.completeMultipartUpload( { - Body: partBuffer, Bucket: destinationDetails.Bucket, Key: destinationDetails.Key, UploadId: multipartUploadID, - PartNumber: localPartNumber + MultipartUpload: { + Parts: partIds + } }, function (err, result) { if (err) - abortUpload('Failed to upload a part to S3: ' + JSON.stringify(err)); + abortUpload('Failed to complete the multipart upload on S3: ' + JSON.stringify(err)); else { - uploadedSize += partBuffer.length; - partIds[localPartNumber - 1] = { - ETag: result.ETag, - PartNumber: localPartNumber - }; - - callback({ - ETag: result.ETag, - PartNumber: localPartNumber, - receivedSize: receivedSize, - uploadedSize: uploadedSize - }); + // Emit both events for backwards compatability, and to follow the spec. + ws.emit('uploaded', result); + ws.emit('finish', result); } } ); - }; - - // Overwrite the end method so that we can hijack it to flush the last part and then complete - // the multipart upload - ws.originalEnd = ws.end; - ws.end = function (Part, encoding, callback) { - ws.originalEnd(Part, encoding, function afterDoneWithOriginalEnd() { - if (Part) - absorbBuffer(Part); - - // Upload any remaining data - var uploadRemainingData = function () { - if (receivedBuffersLength > 0) { - uploadHandler(uploadRemainingData); - return; - } - - if (pendingParts > 0) { - setTimeout(uploadRemainingData, 50); // Wait 50 ms for the pending uploads to finish before trying again. - return; - } - - completeUpload(); - }; - - uploadRemainingData(); - - if (typeof callback == 'function') - callback(); - }); - }; - - // Turn all the individual parts we uploaded to S3 into a finalized upload. - var completeUpload = function () { - // There is a possibility that the incoming stream was empty, therefore the MPU never started - // and can not be finalized. - if (multipartUploadID) { - cachedClient.completeMultipartUpload( - { - Bucket: destinationDetails.Bucket, - Key: destinationDetails.Key, - UploadId: multipartUploadID, - MultipartUpload: { - Parts: partIds - } - }, - function (err, result) { - if (err) - abortUpload('Failed to complete the multipart upload on S3: ' + JSON.stringify(err)); - else { - // Emit both events for backwards compatability, and to follow the spec. - ws.emit('uploaded', result); - ws.emit('finish', result); - } - } - ); + } + }; + + // When a fatal error occurs abort the multipart upload + var abortUpload = function (rootError) { + cachedClient.abortMultipartUpload( + { + Bucket: destinationDetails.Bucket, + Key: destinationDetails.Key, + UploadId: multipartUploadID + }, + function (abortError) { + if (abortError) + ws.emit('error', rootError + '\n Additionally failed to abort the multipart upload on S3: ' + abortError); + else + ws.emit('error', rootError); } - }; - - // When a fatal error occurs abort the multipart upload - var abortUpload = function (rootError) { - cachedClient.abortMultipartUpload( - { - Bucket: destinationDetails.Bucket, - Key: destinationDetails.Key, - UploadId: multipartUploadID - }, - function (abortError) { - if (abortError) - ws.emit('error', rootError + '\n Additionally failed to abort the multipart upload on S3: ' + abortError); - else - ws.emit('error', rootError); + ); + }; + + var createMultipartUpload = function () { + cachedClient.createMultipartUpload( + destinationDetails, + function (err, data) { + if (err) + ws.emit('error', 'Failed to create a multipart upload on S3: ' + JSON.stringify(err)); + else { + multipartUploadID = data.UploadId; + ws.emit('ready'); + e.emit('ready'); // Internal event } - ); - }; + } + ); + }; - var createMultipartUpload = function () { - cachedClient.createMultipartUpload( - destinationDetails, - function (err, data) { - if (err) - ws.emit('error', 'Failed to create a multipart upload on S3: ' + JSON.stringify(err)); - else { - multipartUploadID = data.UploadId; - ws.emit('ready'); - e.emit('ready'); // Internal event - } - } - ); - }; + return ws; +} - if (!cachedClient) { - throw new Error('Must configure an S3 client before attempting to create an S3 upload stream.'); - } +Client.globalClient = null; - return ws; +Client.client = function (options) { + Client.globalClient = new Client(options); + return Client.globalClient; +} + +Client.upload = function (destinationDetails) { + if (!Client.globalClient) { + throw new Error('Must configure an S3 client before attempting to create an S3 upload stream.'); } -}; + return Client.globalClient.upload(destinationDetails) +} + +module.exports = Client; diff --git a/package.json b/package.json index 4c063f4..21b2224 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "s3-upload-stream", "description": "Writeable stream for uploading content of unknown size to S3 via the multipart API.", - "version": "1.0.4", + "version": "1.0.5", "author": { "name": "Nathan Peck", "email": "nathan@storydesk.com"