From 0089b93555789b2bb32d1409872aa0a88cf806c2 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 11:45:46 -0700 Subject: [PATCH 01/17] basic refactor of restore --- bin/dynamo-restore-from-s3 | 2 +- lib/dynamo-restore.js | 147 ++++++++++++++++++------------------- package.json | 6 +- 3 files changed, 75 insertions(+), 80 deletions(-) diff --git a/bin/dynamo-restore-from-s3 b/bin/dynamo-restore-from-s3 index f2b8dd0..bc6d2b3 100755 --- a/bin/dynamo-restore-from-s3 +++ b/bin/dynamo-restore-from-s3 @@ -87,4 +87,4 @@ dynamoRestore.run(function() { seconds = Math.floor((diff % 1000 * 60) / 1000); console.log('Done! Process completed in %s minutes %s seconds.', minutes, seconds); process.exit(0); -}); \ No newline at end of file +}); diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index b341fb8..c1fd6db 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -2,20 +2,21 @@ * lib/dynamo-restore.js * * By Steven de Salas - * + * * AWS Restore to DynamoDB. Streams an S3 backup to a new DynamoDB table. * */ var URL = require('url'); var util = require('util'); var AWS = require('aws-sdk'); +var S3S = require('s3-streams'); var events = require('events'); var readline = require('readline'); var DYNAMO_CHUNK_SIZE = 25; function DynamoRestore(options) { options = options || {}; - options.concurrency = options.concurrency || 200; + options.concurrency = options.concurrency && options.concurrency / DYNAMO_CHUNK_SIZE || 200; options.minConcurrency = 1; options.maxConcurrency = options.concurrency; options.readcapacity = options.readcapacity || 5; @@ -32,10 +33,13 @@ function DynamoRestore(options) { }); this.options = options; + this.batches = this.batches || []; + this.requests = this.requests || 0; + this.requestItems = this.requestItems || []; this.dynamodb = new AWS.DynamoDB(); } -// Stick to prototypal inheritance. While this can be done differently +// Stick to prototypal inheritance. While this can be done differently // in ES6 we'd be making package unusable for older engines (0.10.x->0.12.x) util.inherits(DynamoRestore, events.EventEmitter); @@ -43,19 +47,21 @@ DynamoRestore.prototype.run = function(finishCallback) { this._validateS3Backup(this.options); this._validateTable(this.options); this._startDownload(); + + var self = this; // Exit program by default if there are no error listeners attached. - this.on('error', (function(message) { + this.on('error', function(message) { if (finishCallback) { finishCallback(message); } - if (this.listeners('error').length <= 1) { + if (self.listeners('error').length <= 1) { throw new Error(message); } - }).bind(this)); + }); // Finish off by updating write capacity to end-state (if needed) - this.on('finish', (function() { - var dynamodb = this.dynamodb, - options = this.options; + this.on('finish', function() { + var dynamodb = self.dynamodb, + options = self.options; // Do we need to update write capacity? if (options.writecapacity) { dynamodb.updateTable({ @@ -68,7 +74,7 @@ DynamoRestore.prototype.run = function(finishCallback) { } else { finishCallback(); } - }).bind(this)); + }); }; DynamoRestore.prototype._validateS3Backup = function(options) { @@ -134,9 +140,10 @@ DynamoRestore.prototype._startDownload = function() { else this.emit('error', util.format('Error downloading file from s3: %s', error)); return; } + //var downloadStream = S3S.ReadStream(s3, params); var downloadStream = s3.getObject(params).createReadStream(); downloadStream.pause(); - // All good, start downloading + // All good, start downloading this.emit('start-download', meta); this.readline = readline.createInterface({ terminal: false, @@ -156,42 +163,41 @@ DynamoRestore.prototype._startDownload = function() { }).bind(this)); }; +DynamoRestore.prototype._processFirstLine = function(line) { + this.readline.pause(); + item = JSON.parse(line); + this.template = item; + this._extractSchema(this.template); + this._pushItem(this.template); +} + DynamoRestore.prototype._processLine = function(line) { - this.batches = this.batches || []; - this.requests = this.requests || []; - this.requestItems = this.requestItems || []; // First Line? if (!this.template) { - // Use this to extract schema information - this.readline.pause(); - this.template = JSON.parse(line); - this._extractSchema(this.template); - } - // Keep tabs on how much data is being consumed - this.readline.meta.RemainingLength -= line.length + 1; - // Create batches of 25 records each - this.requestItems.push({ PutRequest: { Item: JSON.parse(line) } }); - if (this.requestItems.length === DYNAMO_CHUNK_SIZE) { - this.batches.push({ - items: this.requestItems.splice(0, DYNAMO_CHUNK_SIZE), - attempts: 0 - }); + this._processFirstLine(line); + } else { + item = JSON.parse(line) + this._pushItem(item) + this.readline.meta.RemainingLength -= line.length + 1; } - // Writing to Dynamo is usually slower than reading from S3, - // and we want to avoid clogging up memory or writing to disk. - // The list of batches waiting for DynamoDB to process would - // quickly get out of hand here, so an easy way around this is to - // stop reading from S3 when the number of requests in flight goes past a - // certain size, and then continue reading when the number is reduced. - if (this.requests.length >= this.options.concurrency) { - //Too many requests! Pausing download.. - this.readline.pause(); +}; + +DynamoRestore.prototype._pushItem = function(item) { + this.requestItems.push({ PutRequest: { Item: item } }); + if (this.requestItems.length >= DYNAMO_CHUNK_SIZE) { + this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)) } - // Send the next batch if we are not exceeding concurrency (ie max requests) - else if (this.tableready && this.batches.length) { - this._sendBatch(); +} + +DynamoRestore.prototype._concatItems = function(items) { + var self = this; + items.forEach(function(item) { + this.requestItems.push({ PutRequest: { Item: item } }); + }) + if (this.requestItems.length >= DYNAMO_CHUNK_SIZE) { + this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)) } -}; +} DynamoRestore.prototype._extractSchema = function(template) { var partitionkey = this.options.partitionkey; @@ -210,7 +216,7 @@ DynamoRestore.prototype._extractSchema = function(template) { if (likelyCandidates.length === 0) { return this.emit('error', 'Fatal Error. Cannot determine --partitionkey from backup, please supply it manually.'); } else { - // Pick the shortest one + // Pick the shortest one partitionkey = likelyCandidates.sort(function(a, b) { return b.length - a.length; }).pop(); @@ -287,72 +293,59 @@ DynamoRestore.prototype._checkTableReady = function(error, data) { } }; -DynamoRestore.prototype._sendBatch = function() { +DynamoRestore.prototype._sendBatch = function(items) { // Prepare var params = { RequestItems: {} }, dynamo = this.dynamodb, - options = this.options; - batch = this.batches.shift(); - params.RequestItems[options.table] = batch.items; + options = this.options, + self = this; + params.RequestItems[options.table] = items; // Send - this.requests.push(dynamo.batchWriteItem(params, (function(error, data) { - this.requests.shift(); + this.requests++; + dynamo.batchWriteItem(params, function(error, data) { + self.requests--; if (error) { - // Problem? Check the number of attempts - if (batch.attempts > options.concurrency) { - if (options.stopOnFailure) { - return this.emit('error', 'Fatal Error. Failed to upload batch. Ending process. \n' + JSON.stringify(batch)); - } else { - this.emit('warning', 'Failed to upload same batch too many times, removing from queue.. \n' + JSON.stringify(batch)); - } - } else { - this.emit('warning', 'Error processing batch, putting back in the queue.'); - batch.attempts++; - this.batches.push(batch); - } + self.emit('warning', 'Error processing batch, putting back in the queue.'); + self.RequestItems.concat(items); } var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; if (unprocessedItems.length) { // Retry unprocessed items - this.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); - this.batches.push({ - items: unprocessedItems, - attempts: batch.attempts + 1 - }); + self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); + self.requestItems.concat(unprocessedItems); // Back off a bit.. - options.concurrency--; + options.concurrency /= 1.2; if (options.concurrency < options.minConcurrency) { options.concurrency = options.minConcurrency; } } else { // Successful upload, increase concurrency again.. - options.concurrency++; - if (options.concurrency > options.maxConcurrency) { - options.concurrency = options.maxConcurrency; + if (options.concurrency < options.maxConcurrency) { + options.concurrency++; } } // Continue downloading data... - if (this.tableready && this.requests.length < options.concurrency * 0.8) { - this.readline.resume(); + if (self.requests < options.concurrency) { + self.readline.resume(); } - this.emit('finish-batch', this.requests.length); - }).bind(this))); + self.emit('finish-batch', self.requests); + }); // Notify - this.emit('send-batch', this.batches.length, this.requests.length, this.readline.meta); + this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); }; DynamoRestore.prototype._finishBatches = function() { if (!this.batches.length) { - if (!this.requests.length) { + if (!this.requests) { // Finish only if there is nothing left to wait for. this.emit('finish'); return; } } else { // Send remaining batches - if (this.requests.length < this.options.concurrency) { + if (this.requests < this.options.concurrency) { this._sendBatch(); } } @@ -360,4 +353,4 @@ DynamoRestore.prototype._finishBatches = function() { setTimeout(this._finishBatches.bind(this), 200); }; -module.exports = DynamoRestore; \ No newline at end of file +module.exports = DynamoRestore; diff --git a/package.json b/package.json index bc46c90..f6a277a 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "backup", "streaming", "s3", - "stream" + "stream", + "s3-streams" ], "bin": { "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3" @@ -28,7 +29,8 @@ "lodash": "^3.10.1", "moment": "^2.10.6", "moment-range": "^2.0.3", - "s3-streaming-upload": "^0.2.1" + "s3-streaming-upload": "^0.2.1", + "s3-streams": "^0.3.0" }, "engines": { "node": ">=0.10.0" From d972da9a44f9cd5703d9c576fec350340c82cac7 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 12:05:13 -0700 Subject: [PATCH 02/17] Added timeout between _sendBatch when concurrency exceeded --- lib/dynamo-restore.js | 62 ++++++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index c1fd6db..1b6dc9a 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -164,7 +164,6 @@ DynamoRestore.prototype._startDownload = function() { }; DynamoRestore.prototype._processFirstLine = function(line) { - this.readline.pause(); item = JSON.parse(line); this.template = item; this._extractSchema(this.template); @@ -174,6 +173,7 @@ DynamoRestore.prototype._processFirstLine = function(line) { DynamoRestore.prototype._processLine = function(line) { // First Line? if (!this.template) { + this.readline.pause() this._processFirstLine(line); } else { item = JSON.parse(line) @@ -302,35 +302,41 @@ DynamoRestore.prototype._sendBatch = function(items) { params.RequestItems[options.table] = items; // Send - this.requests++; - dynamo.batchWriteItem(params, function(error, data) { - self.requests--; - if (error) { - self.emit('warning', 'Error processing batch, putting back in the queue.'); - self.RequestItems.concat(items); - } - var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; - if (unprocessedItems.length) { - // Retry unprocessed items - self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); - self.requestItems.concat(unprocessedItems); - // Back off a bit.. - options.concurrency /= 1.2; - if (options.concurrency < options.minConcurrency) { - options.concurrency = options.minConcurrency; - } - } else { - // Successful upload, increase concurrency again.. - if (options.concurrency < options.maxConcurrency) { - options.concurrency++; - } - } - // Continue downloading data... + var waitForConcurrency = function() { if (self.requests < options.concurrency) { - self.readline.resume(); + self.requests++; + return dynamo.batchWriteItem(params, function(error, data) { + self.requests--; + if (error) { + self.emit('warning', 'Error processing batch, putting back in the queue.'); + self.RequestItems.concat(items); + } + var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; + if (unprocessedItems.length) { + // Retry unprocessed items + self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); + self.requestItems.concat(unprocessedItems); + // Back off a bit.. + options.concurrency /= 1.2; + if (options.concurrency < options.minConcurrency) { + options.concurrency = options.minConcurrency; + } + } else { + // Successful upload, increase concurrency again.. + if (options.concurrency < options.maxConcurrency) { + options.concurrency++; + } + } + // Continue downloading data... + self.readline.resume(); + self.emit('finish-batch', self.requests); + }); + } else { + self.readline.pause(); + setTimeout(waitForConcurrency, 100); } - self.emit('finish-batch', self.requests); - }); + }; + waitForConcurrency(); // Notify this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); From e8cefa4d1b553c0ce2b97e5d9513bb2468dd1978 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 12:11:38 -0700 Subject: [PATCH 03/17] moved send-batch emit --- lib/dynamo-restore.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 1b6dc9a..77648c5 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -304,6 +304,7 @@ DynamoRestore.prototype._sendBatch = function(items) { // Send var waitForConcurrency = function() { if (self.requests < options.concurrency) { + this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); self.requests++; return dynamo.batchWriteItem(params, function(error, data) { self.requests--; @@ -337,9 +338,6 @@ DynamoRestore.prototype._sendBatch = function(items) { } }; waitForConcurrency(); - - // Notify - this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); }; DynamoRestore.prototype._finishBatches = function() { From e1ab3ae1481ad8d286cd4839d6412247aa651716 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 12:46:51 -0700 Subject: [PATCH 04/17] attempt to time requests --- lib/dynamo-restore.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 77648c5..9e31a82 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -33,8 +33,8 @@ function DynamoRestore(options) { }); this.options = options; - this.batches = this.batches || []; this.requests = this.requests || 0; + this.lastRequest = Date.now(); this.requestItems = this.requestItems || []; this.dynamodb = new AWS.DynamoDB(); } @@ -298,13 +298,17 @@ DynamoRestore.prototype._sendBatch = function(items) { var params = { RequestItems: {} }, dynamo = this.dynamodb, options = this.options, - self = this; + self = this, + interval = (1000 / options.maxConcurrency); params.RequestItems[options.table] = items; // Send var waitForConcurrency = function() { - if (self.requests < options.concurrency) { - this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); + now = Date.now(); + nextRun = now - lastRequest - interval; + if (self.requests < options.concurrency && nextRun <= 0) { + lastRequest = now; + self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); self.requests++; return dynamo.batchWriteItem(params, function(error, data) { self.requests--; @@ -334,7 +338,7 @@ DynamoRestore.prototype._sendBatch = function(items) { }); } else { self.readline.pause(); - setTimeout(waitForConcurrency, 100); + setTimeout(waitForConcurrency, nextRun); } }; waitForConcurrency(); From 1ec284483e1ef957cd1392ba798f183f6cd551e3 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 12:52:42 -0700 Subject: [PATCH 05/17] purged the unclean --- lib/dynamo-restore.js | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 9e31a82..55591d6 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -152,10 +152,7 @@ DynamoRestore.prototype._startDownload = function() { .on('line', this._processLine.bind(this)) .on('close', (function() { this.emit('finish-download'); - this.batches.push({ - items: this.requestItems.splice(0, DYNAMO_CHUNK_SIZE), - attempts: 0 - }); + this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); this._finishBatches(); }).bind(this)); this.readline.meta = meta; @@ -287,7 +284,7 @@ DynamoRestore.prototype._checkTableReady = function(error, data) { // All ready, now we can start inserting records this.tableready = true; this.readline.resume(); - while (this.batches.length) { this._sendBatch(); } + while (this.requestItems.length) { this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); } } else { setTimeout(dynamodb.describeTable.bind(dynamodb, { TableName: data.Table.TableName }, this._checkTableReady.bind(this)), 1000); } @@ -305,9 +302,9 @@ DynamoRestore.prototype._sendBatch = function(items) { // Send var waitForConcurrency = function() { now = Date.now(); - nextRun = now - lastRequest - interval; + nextRun = now - self.lastRequest - interval; if (self.requests < options.concurrency && nextRun <= 0) { - lastRequest = now; + self.lastRequest = now; self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); self.requests++; return dynamo.batchWriteItem(params, function(error, data) { @@ -345,7 +342,7 @@ DynamoRestore.prototype._sendBatch = function(items) { }; DynamoRestore.prototype._finishBatches = function() { - if (!this.batches.length) { + if (!this.requestItems.length) { if (!this.requests) { // Finish only if there is nothing left to wait for. this.emit('finish'); From 4b3650c9f780b494ee806556348d3b97b9148b07 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 12:59:03 -0700 Subject: [PATCH 06/17] redid timing math --- lib/dynamo-restore.js | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 55591d6..bcc8202 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -301,11 +301,7 @@ DynamoRestore.prototype._sendBatch = function(items) { // Send var waitForConcurrency = function() { - now = Date.now(); - nextRun = now - self.lastRequest - interval; - if (self.requests < options.concurrency && nextRun <= 0) { - self.lastRequest = now; - self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); + if (self.requests < options.concurrency) { self.requests++; return dynamo.batchWriteItem(params, function(error, data) { self.requests--; @@ -334,11 +330,16 @@ DynamoRestore.prototype._sendBatch = function(items) { self.emit('finish-batch', self.requests); }); } else { - self.readline.pause(); + if (self.requests >= options.concurrency) { + self.readline.pause(); + } setTimeout(waitForConcurrency, nextRun); } }; waitForConcurrency(); + + // Notify + this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); }; DynamoRestore.prototype._finishBatches = function() { From d1fa68d35088db420056da01986d7d02eda41f88 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 20:20:37 +0000 Subject: [PATCH 07/17] Revert "attempt to time requests" --- lib/dynamo-restore.js | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index bcc8202..f038373 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -33,8 +33,8 @@ function DynamoRestore(options) { }); this.options = options; + this.batches = this.batches || []; this.requests = this.requests || 0; - this.lastRequest = Date.now(); this.requestItems = this.requestItems || []; this.dynamodb = new AWS.DynamoDB(); } @@ -295,13 +295,13 @@ DynamoRestore.prototype._sendBatch = function(items) { var params = { RequestItems: {} }, dynamo = this.dynamodb, options = this.options, - self = this, - interval = (1000 / options.maxConcurrency); + self = this; params.RequestItems[options.table] = items; // Send var waitForConcurrency = function() { if (self.requests < options.concurrency) { + self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); self.requests++; return dynamo.batchWriteItem(params, function(error, data) { self.requests--; @@ -330,16 +330,11 @@ DynamoRestore.prototype._sendBatch = function(items) { self.emit('finish-batch', self.requests); }); } else { - if (self.requests >= options.concurrency) { - self.readline.pause(); - } - setTimeout(waitForConcurrency, nextRun); + self.readline.pause(); + setTimeout(waitForConcurrency, 100); } }; waitForConcurrency(); - - // Notify - this.emit('send-batch', this.requestItems.length, this.requests, this.readline.meta); }; DynamoRestore.prototype._finishBatches = function() { From f6bec0bde00f83343f117b3d3153f196456a740d Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 14:28:28 -0700 Subject: [PATCH 08/17] decrease concurrency when running fast --- lib/dynamo-restore.js | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index f038373..35950f9 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -296,6 +296,7 @@ DynamoRestore.prototype._sendBatch = function(items) { dynamo = this.dynamodb, options = this.options, self = this; + var interval = 1000 / options.maxConcurrency; params.RequestItems[options.table] = items; // Send @@ -303,6 +304,7 @@ DynamoRestore.prototype._sendBatch = function(items) { if (self.requests < options.concurrency) { self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); self.requests++; + start = Date.now(); return dynamo.batchWriteItem(params, function(error, data) { self.requests--; if (error) { @@ -319,6 +321,10 @@ DynamoRestore.prototype._sendBatch = function(items) { if (options.concurrency < options.minConcurrency) { options.concurrency = options.minConcurrency; } + } else if (Date.now() - start < interval){ + if (options.concurrency > options.minConcurrency) { + options.concurrency--; + } } else { // Successful upload, increase concurrency again.. if (options.concurrency < options.maxConcurrency) { @@ -331,7 +337,7 @@ DynamoRestore.prototype._sendBatch = function(items) { }); } else { self.readline.pause(); - setTimeout(waitForConcurrency, 100); + setTimeout(waitForConcurrency, interval); } }; waitForConcurrency(); From 753bc3a7ea89cbdfa191d0d95bdc2e77082f2e78 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Wed, 20 Sep 2017 14:31:25 -0700 Subject: [PATCH 09/17] debug printing --- bin/dynamo-restore-from-s3 | 2 +- lib/dynamo-restore.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/dynamo-restore-from-s3 b/bin/dynamo-restore-from-s3 index bc6d2b3..c9a120c 100755 --- a/bin/dynamo-restore-from-s3 +++ b/bin/dynamo-restore-from-s3 @@ -76,7 +76,7 @@ dynamoRestore.on('start-download', function(streamMeta) { }); dynamoRestore.on('send-batch', function(batches, requests, streamMeta) { - console.log('Batch sent. %d in flight. %s remaining to download...', requests, translate(streamMeta.RemainingLength)); + console.log('Batch sent. %d/%d in flight. %s remaining to download...', requests, batches, translate(streamMeta.RemainingLength)); }); // Start Process diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 35950f9..adc4359 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -302,7 +302,7 @@ DynamoRestore.prototype._sendBatch = function(items) { // Send var waitForConcurrency = function() { if (self.requests < options.concurrency) { - self.emit('send-batch', self.requestItems.length, self.requests, self.readline.meta); + self.emit('send-batch', options.concurrency, self.requests, self.readline.meta); self.requests++; start = Date.now(); return dynamo.batchWriteItem(params, function(error, data) { @@ -337,7 +337,7 @@ DynamoRestore.prototype._sendBatch = function(items) { }); } else { self.readline.pause(); - setTimeout(waitForConcurrency, interval); + setTimeout(waitForConcurrency, 0); } }; waitForConcurrency(); From 7af25528f3a46b17fe04988f936b4c4ddbcb3747 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Thu, 21 Sep 2017 11:49:34 -0700 Subject: [PATCH 10/17] added option to read from local file rather than s3 S3 has a chance of randomly closing the connection before the download is finished. This makes restoring from large files impossible. This is a hack, to download the file quickly, then do the much-slower restore. --- lib/dynamo-restore.js | 93 +++++++++++++++++++++++-------------------- package.json | 2 - 2 files changed, 50 insertions(+), 45 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index adc4359..2de7b08 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -6,10 +6,10 @@ * AWS Restore to DynamoDB. Streams an S3 backup to a new DynamoDB table. * */ +var fs = require('fs'); var URL = require('url'); var util = require('util'); var AWS = require('aws-sdk'); -var S3S = require('s3-streams'); var events = require('events'); var readline = require('readline'); var DYNAMO_CHUNK_SIZE = 25; @@ -33,6 +33,7 @@ function DynamoRestore(options) { }); this.options = options; + this.options.localfile = false; this.batches = this.batches || []; this.requests = this.requests || 0; this.requestItems = this.requestItems || []; @@ -81,7 +82,11 @@ DynamoRestore.prototype._validateS3Backup = function(options) { // Check S3 URI var url = URL.parse(options.source); if (!url || url.protocol !== 's3:') { - return this.emit('error', 'Please provide an s3 URI as file source (ie s3://mybucketname/folder/mydynamotable.json)'); + if (fs.existsSync(options.source)) { + return this.localfile = true; + } else { + return this.emit('error', 'Please provide an s3 URI or real file source (ie s3://mybucketname/folder/mydynamotable.json)'); + } } if (!url.pathname || !url.hostname || url.search || url.hash || url.auth) { return this.emit('error', 'Please provide a simple s3 URI as file source (ie s3://mybucketname/folder/mydynamotable.json)'); @@ -128,36 +133,48 @@ DynamoRestore.prototype._checkTableExists = function(error, data) { }; DynamoRestore.prototype._startDownload = function() { - var s3 = new AWS.S3(); - var params = { - Bucket: this.options.s3bucket, - Key: this.options.s3path - }; - // First determine if file exists in s3 - s3.headObject(params, (function(error, meta) { - if (error) { - if (error.code === 'NotFound') this.emit('error', util.format('Could not find file in s3. %s', this.options.source)); - else this.emit('error', util.format('Error downloading file from s3: %s', error)); - return; - } - //var downloadStream = S3S.ReadStream(s3, params); - var downloadStream = s3.getObject(params).createReadStream(); - downloadStream.pause(); - // All good, start downloading - this.emit('start-download', meta); - this.readline = readline.createInterface({ + var options = this.options; + var self = this; + + var createReadline(stream, meta) { + self.emit('start-download', meta); + self.readline = readline.createInterface({ terminal: false, input: downloadStream }) - .on('line', this._processLine.bind(this)) - .on('close', (function() { - this.emit('finish-download'); - this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); - this._finishBatches(); - }).bind(this)); - this.readline.meta = meta; - this.readline.meta.RemainingLength = meta.ContentLength; - }).bind(this)); + .on('line', self._processLine.bind(self)) + .on('close', function() { + self.emit('finish-download'); + self._sendBatch(self.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); + self._finishBatches(); + }); + self.readline.meta = meta; + self.readline.meta.RemainingLength = meta.ContentLength; + } + + if (options.localfile) { + fileStream = fs.ReadStream(options.source); + meta = {RemainingLength: fs.statSync(options.source).size}; + createReadline(fileStream, meta); + } else { + var s3 = new AWS.S3(); + var params = { + Bucket: self.options.s3bucket, + Key: self.options.s3path + }; + // First determine if file exists in s3 + s3.headObject(params, function(error, meta) { + if (error) { + if (error.code === 'NotFound') self.emit('error', util.format('Could not find file in s3. %s', self.options.source)); + else self.emit('error', util.format('Error downloading file from s3: %s', error)); + return; + } + var downloadStream = s3.getObject(params).createReadStream(); + downloadStream.pause(); + // All good, start downloading + createReadline(downloadStream, meta); + }); + } }; DynamoRestore.prototype._processFirstLine = function(line) { @@ -170,11 +187,11 @@ DynamoRestore.prototype._processFirstLine = function(line) { DynamoRestore.prototype._processLine = function(line) { // First Line? if (!this.template) { - this.readline.pause() + this.readline.pause(); this._processFirstLine(line); } else { - item = JSON.parse(line) - this._pushItem(item) + item = JSON.parse(line); + this._pushItem(item); this.readline.meta.RemainingLength -= line.length + 1; } }; @@ -186,16 +203,6 @@ DynamoRestore.prototype._pushItem = function(item) { } } -DynamoRestore.prototype._concatItems = function(items) { - var self = this; - items.forEach(function(item) { - this.requestItems.push({ PutRequest: { Item: item } }); - }) - if (this.requestItems.length >= DYNAMO_CHUNK_SIZE) { - this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)) - } -} - DynamoRestore.prototype._extractSchema = function(template) { var partitionkey = this.options.partitionkey; if (partitionkey) { @@ -337,7 +344,7 @@ DynamoRestore.prototype._sendBatch = function(items) { }); } else { self.readline.pause(); - setTimeout(waitForConcurrency, 0); + setTimeout(waitForConcurrency, 100); } }; waitForConcurrency(); diff --git a/package.json b/package.json index f6a277a..0668bea 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,6 @@ "streaming", "s3", "stream", - "s3-streams" ], "bin": { "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3" @@ -30,7 +29,6 @@ "moment": "^2.10.6", "moment-range": "^2.0.3", "s3-streaming-upload": "^0.2.1", - "s3-streams": "^0.3.0" }, "engines": { "node": ">=0.10.0" From d952d3377eb7fb89e07fe78a01466a2cb3281b1a Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Thu, 21 Sep 2017 11:55:43 -0700 Subject: [PATCH 11/17] removed timer based throttling most of the time was spent in node (CPU bound). Timing only how long the request took failed to acount for overhead, and thus throttled down to 20% of the target. --- lib/dynamo-restore.js | 6 ------ 1 file changed, 6 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 2de7b08..cf30231 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -311,7 +311,6 @@ DynamoRestore.prototype._sendBatch = function(items) { if (self.requests < options.concurrency) { self.emit('send-batch', options.concurrency, self.requests, self.readline.meta); self.requests++; - start = Date.now(); return dynamo.batchWriteItem(params, function(error, data) { self.requests--; if (error) { @@ -324,11 +323,6 @@ DynamoRestore.prototype._sendBatch = function(items) { self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); self.requestItems.concat(unprocessedItems); // Back off a bit.. - options.concurrency /= 1.2; - if (options.concurrency < options.minConcurrency) { - options.concurrency = options.minConcurrency; - } - } else if (Date.now() - start < interval){ if (options.concurrency > options.minConcurrency) { options.concurrency--; } From 424e25ef7e918a9788891c39789a9dddb79ef4ce Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Thu, 21 Sep 2017 21:42:25 +0000 Subject: [PATCH 12/17] fixed for testing --- lib/dynamo-restore.js | 12 ++++++------ package.json | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index cf30231..29595c5 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -83,7 +83,7 @@ DynamoRestore.prototype._validateS3Backup = function(options) { var url = URL.parse(options.source); if (!url || url.protocol !== 's3:') { if (fs.existsSync(options.source)) { - return this.localfile = true; + return this.options.localfile = true; } else { return this.emit('error', 'Please provide an s3 URI or real file source (ie s3://mybucketname/folder/mydynamotable.json)'); } @@ -136,11 +136,11 @@ DynamoRestore.prototype._startDownload = function() { var options = this.options; var self = this; - var createReadline(stream, meta) { + var createReadline = function(stream, meta) { self.emit('start-download', meta); self.readline = readline.createInterface({ terminal: false, - input: downloadStream + input: stream }) .on('line', self._processLine.bind(self)) .on('close', function() { @@ -154,7 +154,7 @@ DynamoRestore.prototype._startDownload = function() { if (options.localfile) { fileStream = fs.ReadStream(options.source); - meta = {RemainingLength: fs.statSync(options.source).size}; + meta = {ContentLength: fs.statSync(options.source).size}; createReadline(fileStream, meta); } else { var s3 = new AWS.S3(); @@ -346,7 +346,7 @@ DynamoRestore.prototype._sendBatch = function(items) { DynamoRestore.prototype._finishBatches = function() { if (!this.requestItems.length) { - if (!this.requests) { + if (this.requests === 0) { // Finish only if there is nothing left to wait for. this.emit('finish'); return; @@ -354,7 +354,7 @@ DynamoRestore.prototype._finishBatches = function() { } else { // Send remaining batches if (this.requests < this.options.concurrency) { - this._sendBatch(); + this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); } } // Repeat until finished diff --git a/package.json b/package.json index 0668bea..bc46c90 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "backup", "streaming", "s3", - "stream", + "stream" ], "bin": { "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3" @@ -28,7 +28,7 @@ "lodash": "^3.10.1", "moment": "^2.10.6", "moment-range": "^2.0.3", - "s3-streaming-upload": "^0.2.1", + "s3-streaming-upload": "^0.2.1" }, "engines": { "node": ">=0.10.0" From a4616725ad3636aef2883e9ec8ceb5b93ea6d97a Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Thu, 21 Sep 2017 14:55:24 -0700 Subject: [PATCH 13/17] updated package.js for release --- package.json | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index bc46c90..7e7a365 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,15 @@ { "name": "dynamo-backup-to-s3", - "version": "0.6.1", + "version": "0.7.0", "author": "Dylan Lingelbach (https://github.com/dylanlingelbach)", "license": "MIT", "contributors": [ - "Steven de Salas (https://github.com/sdesalas)" + "Steven de Salas (https://github.com/sdesalas)", + "Matt Geskey (https://github.com/vitrvvivs)" ], "repository": { "type": "git", - "url": "https://github.com/markitx/dynamo-backup-to-s3.git" + "url": "https://github.com/celtra/dynamo-backup-to-s3.git" }, "keywords": [ "dynamo", @@ -19,7 +20,8 @@ "stream" ], "bin": { - "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3" + "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3", + "dynamo-restore-from-s3": "./bin/dynamo-restore-from-s3" }, "dependencies": { "async": "^1.5.0", @@ -34,7 +36,7 @@ "node": ">=0.10.0" }, "bugs": { - "url": "https://github.com/markitx/dynamo-backup-to-s3/issues" + "url": "https://github.com/celtra/dynamo-backup-to-s3/issues" }, "devDependencies": { "mocha": "^2.3.4" From c9aaecae3ba2b523f8432e968c5e2fc225827343 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Mon, 25 Sep 2017 14:08:48 -0700 Subject: [PATCH 14/17] Separated read and send loops; no unprocessed items when requests < 1s This is 60% faster on my test tables --- bin/dynamo-restore-from-s3 | 10 ++-- lib/dynamo-restore.js | 95 ++++++++++++++++---------------------- 2 files changed, 44 insertions(+), 61 deletions(-) diff --git a/bin/dynamo-restore-from-s3 b/bin/dynamo-restore-from-s3 index c9a120c..8576532 100755 --- a/bin/dynamo-restore-from-s3 +++ b/bin/dynamo-restore-from-s3 @@ -55,9 +55,9 @@ function translate(contentLength) { var kb = contentLength / 1024, mb = kb / 1024, gb = mb / 1024; - return gb > 5 ? gb.toFixed(0) + ' Gb' : - (mb > 5 ? mb.toFixed(0) + 'Mb' : - kb.toFixed(0) + 'Kb'); + return gb > 5 ? gb.toFixed(1) + ' GiB' : + (mb > 5 ? mb.toFixed(1) + 'MiB' : + kb.toFixed(0) + 'KiB'); } // Define events @@ -75,8 +75,8 @@ dynamoRestore.on('start-download', function(streamMeta) { console.log('Starting download. %s remaining...', translate(streamMeta.ContentLength)); }); -dynamoRestore.on('send-batch', function(batches, requests, streamMeta) { - console.log('Batch sent. %d/%d in flight. %s remaining to download...', requests, batches, translate(streamMeta.RemainingLength)); +dynamoRestore.on('send-batch', function(batches, queue, streamMeta) { + console.log('%d batches in flight. %d items in queue. %s remaining to download...', batches, queue, translate(streamMeta.RemainingLength)); }); // Start Process diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 29595c5..77cbb72 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -13,10 +13,12 @@ var AWS = require('aws-sdk'); var events = require('events'); var readline = require('readline'); var DYNAMO_CHUNK_SIZE = 25; +var BUFFER_ITEMS; +var INTERVAL; function DynamoRestore(options) { options = options || {}; - options.concurrency = options.concurrency && options.concurrency / DYNAMO_CHUNK_SIZE || 200; + options.concurrency = options.concurrency && options.concurrency / DYNAMO_CHUNK_SIZE || 20; options.minConcurrency = 1; options.maxConcurrency = options.concurrency; options.readcapacity = options.readcapacity || 5; @@ -24,7 +26,7 @@ function DynamoRestore(options) { options.stopOnFailure = options.stopOnFailure || false; options.awsKey = options.awsKey || process.env.AWS_ACCESS_KEY_ID; options.awsSecret = options.awsSecret || process.env.AWS_SECRET_ACCESS_KEY; - options.awsRegion = options.awsRegion || process.env.AWS_DEFAULT_REGION || 'ap-southeast-2'; + options.awsRegion = options.awsRegion || process.env.AWS_DEFAULT_REGION || 'us-east-1'; AWS.config.update({ accessKeyId: options.awsKey, @@ -34,10 +36,12 @@ function DynamoRestore(options) { this.options = options; this.options.localfile = false; - this.batches = this.batches || []; - this.requests = this.requests || 0; - this.requestItems = this.requestItems || []; + this.requests = 0; + this.drain = false; + this.requestItems = new Array(); this.dynamodb = new AWS.DynamoDB(); + INTERVAL = 1000 / options.maxConcurrency; + BUFFER_ITEMS = options.concurrency * DYNAMO_CHUNK_SIZE * 5; // 5s of comsumption } // Stick to prototypal inheritance. While this can be done differently @@ -145,8 +149,7 @@ DynamoRestore.prototype._startDownload = function() { .on('line', self._processLine.bind(self)) .on('close', function() { self.emit('finish-download'); - self._sendBatch(self.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); - self._finishBatches(); + this.drain = true; }); self.readline.meta = meta; self.readline.meta.RemainingLength = meta.ContentLength; @@ -174,14 +177,13 @@ DynamoRestore.prototype._startDownload = function() { // All good, start downloading createReadline(downloadStream, meta); }); - } + }; }; DynamoRestore.prototype._processFirstLine = function(line) { item = JSON.parse(line); - this.template = item; - this._extractSchema(this.template); - this._pushItem(this.template); + this.template = this._extractSchema(item); + this.requestItems.push({ PutRequest: { Item: item } }); } DynamoRestore.prototype._processLine = function(line) { @@ -191,18 +193,11 @@ DynamoRestore.prototype._processLine = function(line) { this._processFirstLine(line); } else { item = JSON.parse(line); - this._pushItem(item); - this.readline.meta.RemainingLength -= line.length + 1; + this.requestItems.push({ PutRequest: { Item: item } }); } + this.readline.meta.RemainingLength -= line.length + 1; }; -DynamoRestore.prototype._pushItem = function(item) { - this.requestItems.push({ PutRequest: { Item: item } }); - if (this.requestItems.length >= DYNAMO_CHUNK_SIZE) { - this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)) - } -} - DynamoRestore.prototype._extractSchema = function(template) { var partitionkey = this.options.partitionkey; if (partitionkey) { @@ -237,6 +232,7 @@ DynamoRestore.prototype._extractSchema = function(template) { } this.options.sortkeytype = Object.keys(template[sortkey]).pop(); } + return template; }; DynamoRestore.prototype._createTable = function(callback) { @@ -291,37 +287,37 @@ DynamoRestore.prototype._checkTableReady = function(error, data) { // All ready, now we can start inserting records this.tableready = true; this.readline.resume(); - while (this.requestItems.length) { this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); } + this._sendBatchLoop(); } else { setTimeout(dynamodb.describeTable.bind(dynamodb, { TableName: data.Table.TableName }, this._checkTableReady.bind(this)), 1000); } }; -DynamoRestore.prototype._sendBatch = function(items) { - // Prepare - var params = { RequestItems: {} }, - dynamo = this.dynamodb, - options = this.options, - self = this; - var interval = 1000 / options.maxConcurrency; - params.RequestItems[options.table] = items; +DynamoRestore.prototype._sendBatchLoop = function() { + if (this.requestItems.length >= DYNAMO_CHUNK_SIZE || this.drain) { + // Prepare + var params = { RequestItems: {} }, + options = this.options, + items = this.requestItems.splice(0, DYNAMO_CHUNK_SIZE); + self = this; + params.RequestItems[options.table] = items; - // Send - var waitForConcurrency = function() { + // Send if (self.requests < options.concurrency) { - self.emit('send-batch', options.concurrency, self.requests, self.readline.meta); self.requests++; - return dynamo.batchWriteItem(params, function(error, data) { + self.emit('send-batch', self.requests, self.requestItems.length, self.readline.meta); + self.dynamodb.batchWriteItem(params, function(error, data) { self.requests--; if (error) { self.emit('warning', 'Error processing batch, putting back in the queue.'); - self.RequestItems.concat(items); + self.requestItems.push.apply(self.requestItems, items); + return; } var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; if (unprocessedItems.length) { // Retry unprocessed items self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); - self.requestItems.concat(unprocessedItems); + self.requestItems.push.apply(self.requestItems, unprocessedItems); // Back off a bit.. if (options.concurrency > options.minConcurrency) { options.concurrency--; @@ -333,32 +329,19 @@ DynamoRestore.prototype._sendBatch = function(items) { } } // Continue downloading data... - self.readline.resume(); self.emit('finish-batch', self.requests); }); - } else { - self.readline.pause(); - setTimeout(waitForConcurrency, 100); - } - }; - waitForConcurrency(); -}; - -DynamoRestore.prototype._finishBatches = function() { - if (!this.requestItems.length) { - if (this.requests === 0) { - // Finish only if there is nothing left to wait for. - this.emit('finish'); - return; } + } if (this.requestItems.length >= BUFFER_ITEMS) { + this.readline.pause(); } else { - // Send remaining batches - if (this.requests < this.options.concurrency) { - this._sendBatch(this.requestItems.splice(0, DYNAMO_CHUNK_SIZE)); - } + this.readline.resume(); + } + if (this.drain && this.requestItems.length === 0 && this.requests === 0) { + this.emit('finish'); + } else { + setTimeout(this._sendBatchLoop.bind(this), INTERVAL); } - // Repeat until finished - setTimeout(this._finishBatches.bind(this), 200); }; module.exports = DynamoRestore; From d75a0acf871834d2b91cfc8eb2f9c5e2c491e6cf Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Mon, 25 Sep 2017 15:01:30 -0700 Subject: [PATCH 15/17] Minor fixes --- bin/dynamo-restore-from-s3 | 2 +- lib/dynamo-restore.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/dynamo-restore-from-s3 b/bin/dynamo-restore-from-s3 index 8576532..184d778 100755 --- a/bin/dynamo-restore-from-s3 +++ b/bin/dynamo-restore-from-s3 @@ -10,7 +10,7 @@ program .option('-s, --source [path]', 'Full S3 path to a JSON backup file (Required)') .option('-t, --table [name]', 'Name of the Dynamo Table to restore to (Required)') .option('-o, --overwrite', 'Table already exists, skip auto-create. Default is false.') - .option('-c, --concurrency ', 'Number of concurrent requests & dynamo capacity units. Defaults to 200.') + .option('-c, --concurrency ', 'Number of concurrent requests & dynamo capacity units. Due to default partitioning, is useless above 1000. Defaults to 500.') .option('-pk, --partitionkey [columnname]', 'Name of Primary Partition Key. If not provided will try determine from backup.') .option('-sk, --sortkey [columnname]', 'Name of Secondary Sort Key. Ignored unless --partitionkey is provided.') .option('-rc, --readcapacity ', 'Read Units for new table (when finished). Default is 5.') diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 77cbb72..27e55f0 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -253,7 +253,7 @@ DynamoRestore.prototype._createTable = function(callback) { }], ProvisionedThroughput: { ReadCapacityUnits: options.readcapacity, - WriteCapacityUnits: options.concurrency // Need this high for pumping data, but will reduce it later. + WriteCapacityUnits: options.concurrency * DYNAMO_CHUNK_SIZE // Need this high for pumping data, but will reduce it later. } }; if (options.sortkey) { From 86df1e0db59de47fa22b97c1386dc0af30d2f625 Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Mon, 25 Sep 2017 16:53:30 -0700 Subject: [PATCH 16/17] covered some edge cases --- lib/dynamo-restore.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index 27e55f0..db72951 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -294,7 +294,7 @@ DynamoRestore.prototype._checkTableReady = function(error, data) { }; DynamoRestore.prototype._sendBatchLoop = function() { - if (this.requestItems.length >= DYNAMO_CHUNK_SIZE || this.drain) { + if (this.requestItems.length >= DYNAMO_CHUNK_SIZE || this.drain && this.requestItems.length) { // Prepare var params = { RequestItems: {} }, options = this.options, @@ -331,8 +331,11 @@ DynamoRestore.prototype._sendBatchLoop = function() { // Continue downloading data... self.emit('finish-batch', self.requests); }); + } else { + self.requestItems.push.apply(self.requestItems, items); } - } if (this.requestItems.length >= BUFFER_ITEMS) { + } + if (this.requestItems.length >= BUFFER_ITEMS) { this.readline.pause(); } else { this.readline.resume(); From 1f2b39d3d53c59b9b5f340d21288670d12058e3e Mon Sep 17 00:00:00 2001 From: Matt Geskey Date: Tue, 26 Sep 2017 09:33:27 -0700 Subject: [PATCH 17/17] I did not understand this --- lib/dynamo-restore.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index db72951..dda8b39 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -149,7 +149,7 @@ DynamoRestore.prototype._startDownload = function() { .on('line', self._processLine.bind(self)) .on('close', function() { self.emit('finish-download'); - this.drain = true; + self.drain = true; }); self.readline.meta = meta; self.readline.meta.RemainingLength = meta.ContentLength;