diff --git a/bin/dynamo-restore-from-s3 b/bin/dynamo-restore-from-s3 index f2b8dd0..184d778 100755 --- a/bin/dynamo-restore-from-s3 +++ b/bin/dynamo-restore-from-s3 @@ -10,7 +10,7 @@ program .option('-s, --source [path]', 'Full S3 path to a JSON backup file (Required)') .option('-t, --table [name]', 'Name of the Dynamo Table to restore to (Required)') .option('-o, --overwrite', 'Table already exists, skip auto-create. Default is false.') - .option('-c, --concurrency ', 'Number of concurrent requests & dynamo capacity units. Defaults to 200.') + .option('-c, --concurrency ', 'Number of concurrent requests & dynamo capacity units. Due to default partitioning, is useless above 1000. Defaults to 500.') .option('-pk, --partitionkey [columnname]', 'Name of Primary Partition Key. If not provided will try determine from backup.') .option('-sk, --sortkey [columnname]', 'Name of Secondary Sort Key. Ignored unless --partitionkey is provided.') .option('-rc, --readcapacity ', 'Read Units for new table (when finished). Default is 5.') @@ -55,9 +55,9 @@ function translate(contentLength) { var kb = contentLength / 1024, mb = kb / 1024, gb = mb / 1024; - return gb > 5 ? gb.toFixed(0) + ' Gb' : - (mb > 5 ? mb.toFixed(0) + 'Mb' : - kb.toFixed(0) + 'Kb'); + return gb > 5 ? gb.toFixed(1) + ' GiB' : + (mb > 5 ? mb.toFixed(1) + 'MiB' : + kb.toFixed(0) + 'KiB'); } // Define events @@ -75,8 +75,8 @@ dynamoRestore.on('start-download', function(streamMeta) { console.log('Starting download. %s remaining...', translate(streamMeta.ContentLength)); }); -dynamoRestore.on('send-batch', function(batches, requests, streamMeta) { - console.log('Batch sent. %d in flight. %s remaining to download...', requests, translate(streamMeta.RemainingLength)); +dynamoRestore.on('send-batch', function(batches, queue, streamMeta) { + console.log('%d batches in flight. %d items in queue. %s remaining to download...', batches, queue, translate(streamMeta.RemainingLength)); }); // Start Process @@ -87,4 +87,4 @@ dynamoRestore.run(function() { seconds = Math.floor((diff % 1000 * 60) / 1000); console.log('Done! Process completed in %s minutes %s seconds.', minutes, seconds); process.exit(0); -}); \ No newline at end of file +}); diff --git a/lib/dynamo-restore.js b/lib/dynamo-restore.js index b341fb8..dda8b39 100755 --- a/lib/dynamo-restore.js +++ b/lib/dynamo-restore.js @@ -2,20 +2,23 @@ * lib/dynamo-restore.js * * By Steven de Salas - * + * * AWS Restore to DynamoDB. Streams an S3 backup to a new DynamoDB table. * */ +var fs = require('fs'); var URL = require('url'); var util = require('util'); var AWS = require('aws-sdk'); var events = require('events'); var readline = require('readline'); var DYNAMO_CHUNK_SIZE = 25; +var BUFFER_ITEMS; +var INTERVAL; function DynamoRestore(options) { options = options || {}; - options.concurrency = options.concurrency || 200; + options.concurrency = options.concurrency && options.concurrency / DYNAMO_CHUNK_SIZE || 20; options.minConcurrency = 1; options.maxConcurrency = options.concurrency; options.readcapacity = options.readcapacity || 5; @@ -23,7 +26,7 @@ function DynamoRestore(options) { options.stopOnFailure = options.stopOnFailure || false; options.awsKey = options.awsKey || process.env.AWS_ACCESS_KEY_ID; options.awsSecret = options.awsSecret || process.env.AWS_SECRET_ACCESS_KEY; - options.awsRegion = options.awsRegion || process.env.AWS_DEFAULT_REGION || 'ap-southeast-2'; + options.awsRegion = options.awsRegion || process.env.AWS_DEFAULT_REGION || 'us-east-1'; AWS.config.update({ accessKeyId: options.awsKey, @@ -32,10 +35,16 @@ function DynamoRestore(options) { }); this.options = options; + this.options.localfile = false; + this.requests = 0; + this.drain = false; + this.requestItems = new Array(); this.dynamodb = new AWS.DynamoDB(); + INTERVAL = 1000 / options.maxConcurrency; + BUFFER_ITEMS = options.concurrency * DYNAMO_CHUNK_SIZE * 5; // 5s of comsumption } -// Stick to prototypal inheritance. While this can be done differently +// Stick to prototypal inheritance. While this can be done differently // in ES6 we'd be making package unusable for older engines (0.10.x->0.12.x) util.inherits(DynamoRestore, events.EventEmitter); @@ -43,19 +52,21 @@ DynamoRestore.prototype.run = function(finishCallback) { this._validateS3Backup(this.options); this._validateTable(this.options); this._startDownload(); + + var self = this; // Exit program by default if there are no error listeners attached. - this.on('error', (function(message) { + this.on('error', function(message) { if (finishCallback) { finishCallback(message); } - if (this.listeners('error').length <= 1) { + if (self.listeners('error').length <= 1) { throw new Error(message); } - }).bind(this)); + }); // Finish off by updating write capacity to end-state (if needed) - this.on('finish', (function() { - var dynamodb = this.dynamodb, - options = this.options; + this.on('finish', function() { + var dynamodb = self.dynamodb, + options = self.options; // Do we need to update write capacity? if (options.writecapacity) { dynamodb.updateTable({ @@ -68,14 +79,18 @@ DynamoRestore.prototype.run = function(finishCallback) { } else { finishCallback(); } - }).bind(this)); + }); }; DynamoRestore.prototype._validateS3Backup = function(options) { // Check S3 URI var url = URL.parse(options.source); if (!url || url.protocol !== 's3:') { - return this.emit('error', 'Please provide an s3 URI as file source (ie s3://mybucketname/folder/mydynamotable.json)'); + if (fs.existsSync(options.source)) { + return this.options.localfile = true; + } else { + return this.emit('error', 'Please provide an s3 URI or real file source (ie s3://mybucketname/folder/mydynamotable.json)'); + } } if (!url.pathname || !url.hostname || url.search || url.hash || url.auth) { return this.emit('error', 'Please provide a simple s3 URI as file source (ie s3://mybucketname/folder/mydynamotable.json)'); @@ -122,75 +137,65 @@ DynamoRestore.prototype._checkTableExists = function(error, data) { }; DynamoRestore.prototype._startDownload = function() { - var s3 = new AWS.S3(); - var params = { - Bucket: this.options.s3bucket, - Key: this.options.s3path - }; - // First determine if file exists in s3 - s3.headObject(params, (function(error, meta) { - if (error) { - if (error.code === 'NotFound') this.emit('error', util.format('Could not find file in s3. %s', this.options.source)); - else this.emit('error', util.format('Error downloading file from s3: %s', error)); - return; - } - var downloadStream = s3.getObject(params).createReadStream(); - downloadStream.pause(); - // All good, start downloading - this.emit('start-download', meta); - this.readline = readline.createInterface({ + var options = this.options; + var self = this; + + var createReadline = function(stream, meta) { + self.emit('start-download', meta); + self.readline = readline.createInterface({ terminal: false, - input: downloadStream + input: stream }) - .on('line', this._processLine.bind(this)) - .on('close', (function() { - this.emit('finish-download'); - this.batches.push({ - items: this.requestItems.splice(0, DYNAMO_CHUNK_SIZE), - attempts: 0 - }); - this._finishBatches(); - }).bind(this)); - this.readline.meta = meta; - this.readline.meta.RemainingLength = meta.ContentLength; - }).bind(this)); + .on('line', self._processLine.bind(self)) + .on('close', function() { + self.emit('finish-download'); + self.drain = true; + }); + self.readline.meta = meta; + self.readline.meta.RemainingLength = meta.ContentLength; + } + + if (options.localfile) { + fileStream = fs.ReadStream(options.source); + meta = {ContentLength: fs.statSync(options.source).size}; + createReadline(fileStream, meta); + } else { + var s3 = new AWS.S3(); + var params = { + Bucket: self.options.s3bucket, + Key: self.options.s3path + }; + // First determine if file exists in s3 + s3.headObject(params, function(error, meta) { + if (error) { + if (error.code === 'NotFound') self.emit('error', util.format('Could not find file in s3. %s', self.options.source)); + else self.emit('error', util.format('Error downloading file from s3: %s', error)); + return; + } + var downloadStream = s3.getObject(params).createReadStream(); + downloadStream.pause(); + // All good, start downloading + createReadline(downloadStream, meta); + }); + }; }; +DynamoRestore.prototype._processFirstLine = function(line) { + item = JSON.parse(line); + this.template = this._extractSchema(item); + this.requestItems.push({ PutRequest: { Item: item } }); +} + DynamoRestore.prototype._processLine = function(line) { - this.batches = this.batches || []; - this.requests = this.requests || []; - this.requestItems = this.requestItems || []; // First Line? if (!this.template) { - // Use this to extract schema information this.readline.pause(); - this.template = JSON.parse(line); - this._extractSchema(this.template); + this._processFirstLine(line); + } else { + item = JSON.parse(line); + this.requestItems.push({ PutRequest: { Item: item } }); } - // Keep tabs on how much data is being consumed this.readline.meta.RemainingLength -= line.length + 1; - // Create batches of 25 records each - this.requestItems.push({ PutRequest: { Item: JSON.parse(line) } }); - if (this.requestItems.length === DYNAMO_CHUNK_SIZE) { - this.batches.push({ - items: this.requestItems.splice(0, DYNAMO_CHUNK_SIZE), - attempts: 0 - }); - } - // Writing to Dynamo is usually slower than reading from S3, - // and we want to avoid clogging up memory or writing to disk. - // The list of batches waiting for DynamoDB to process would - // quickly get out of hand here, so an easy way around this is to - // stop reading from S3 when the number of requests in flight goes past a - // certain size, and then continue reading when the number is reduced. - if (this.requests.length >= this.options.concurrency) { - //Too many requests! Pausing download.. - this.readline.pause(); - } - // Send the next batch if we are not exceeding concurrency (ie max requests) - else if (this.tableready && this.batches.length) { - this._sendBatch(); - } }; DynamoRestore.prototype._extractSchema = function(template) { @@ -210,7 +215,7 @@ DynamoRestore.prototype._extractSchema = function(template) { if (likelyCandidates.length === 0) { return this.emit('error', 'Fatal Error. Cannot determine --partitionkey from backup, please supply it manually.'); } else { - // Pick the shortest one + // Pick the shortest one partitionkey = likelyCandidates.sort(function(a, b) { return b.length - a.length; }).pop(); @@ -227,6 +232,7 @@ DynamoRestore.prototype._extractSchema = function(template) { } this.options.sortkeytype = Object.keys(template[sortkey]).pop(); } + return template; }; DynamoRestore.prototype._createTable = function(callback) { @@ -247,7 +253,7 @@ DynamoRestore.prototype._createTable = function(callback) { }], ProvisionedThroughput: { ReadCapacityUnits: options.readcapacity, - WriteCapacityUnits: options.concurrency // Need this high for pumping data, but will reduce it later. + WriteCapacityUnits: options.concurrency * DYNAMO_CHUNK_SIZE // Need this high for pumping data, but will reduce it later. } }; if (options.sortkey) { @@ -281,83 +287,64 @@ DynamoRestore.prototype._checkTableReady = function(error, data) { // All ready, now we can start inserting records this.tableready = true; this.readline.resume(); - while (this.batches.length) { this._sendBatch(); } + this._sendBatchLoop(); } else { setTimeout(dynamodb.describeTable.bind(dynamodb, { TableName: data.Table.TableName }, this._checkTableReady.bind(this)), 1000); } }; -DynamoRestore.prototype._sendBatch = function() { - // Prepare - var params = { RequestItems: {} }, - dynamo = this.dynamodb, - options = this.options; - batch = this.batches.shift(); - params.RequestItems[options.table] = batch.items; +DynamoRestore.prototype._sendBatchLoop = function() { + if (this.requestItems.length >= DYNAMO_CHUNK_SIZE || this.drain && this.requestItems.length) { + // Prepare + var params = { RequestItems: {} }, + options = this.options, + items = this.requestItems.splice(0, DYNAMO_CHUNK_SIZE); + self = this; + params.RequestItems[options.table] = items; - // Send - this.requests.push(dynamo.batchWriteItem(params, (function(error, data) { - this.requests.shift(); - if (error) { - // Problem? Check the number of attempts - if (batch.attempts > options.concurrency) { - if (options.stopOnFailure) { - return this.emit('error', 'Fatal Error. Failed to upload batch. Ending process. \n' + JSON.stringify(batch)); + // Send + if (self.requests < options.concurrency) { + self.requests++; + self.emit('send-batch', self.requests, self.requestItems.length, self.readline.meta); + self.dynamodb.batchWriteItem(params, function(error, data) { + self.requests--; + if (error) { + self.emit('warning', 'Error processing batch, putting back in the queue.'); + self.requestItems.push.apply(self.requestItems, items); + return; + } + var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; + if (unprocessedItems.length) { + // Retry unprocessed items + self.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); + self.requestItems.push.apply(self.requestItems, unprocessedItems); + // Back off a bit.. + if (options.concurrency > options.minConcurrency) { + options.concurrency--; + } } else { - this.emit('warning', 'Failed to upload same batch too many times, removing from queue.. \n' + JSON.stringify(batch)); + // Successful upload, increase concurrency again.. + if (options.concurrency < options.maxConcurrency) { + options.concurrency++; + } } - } else { - this.emit('warning', 'Error processing batch, putting back in the queue.'); - batch.attempts++; - this.batches.push(batch); - } - } - var unprocessedItems = data && data.UnprocessedItems && data.UnprocessedItems[options.table] || []; - if (unprocessedItems.length) { - // Retry unprocessed items - this.emit('warning', unprocessedItems.length + ' unprocessed items. Add to queue and back off a bit.'); - this.batches.push({ - items: unprocessedItems, - attempts: batch.attempts + 1 + // Continue downloading data... + self.emit('finish-batch', self.requests); }); - // Back off a bit.. - options.concurrency--; - if (options.concurrency < options.minConcurrency) { - options.concurrency = options.minConcurrency; - } } else { - // Successful upload, increase concurrency again.. - options.concurrency++; - if (options.concurrency > options.maxConcurrency) { - options.concurrency = options.maxConcurrency; - } - } - // Continue downloading data... - if (this.tableready && this.requests.length < options.concurrency * 0.8) { - this.readline.resume(); - } - this.emit('finish-batch', this.requests.length); - }).bind(this))); - - // Notify - this.emit('send-batch', this.batches.length, this.requests.length, this.readline.meta); -}; - -DynamoRestore.prototype._finishBatches = function() { - if (!this.batches.length) { - if (!this.requests.length) { - // Finish only if there is nothing left to wait for. - this.emit('finish'); - return; + self.requestItems.push.apply(self.requestItems, items); } + } + if (this.requestItems.length >= BUFFER_ITEMS) { + this.readline.pause(); } else { - // Send remaining batches - if (this.requests.length < this.options.concurrency) { - this._sendBatch(); - } + this.readline.resume(); + } + if (this.drain && this.requestItems.length === 0 && this.requests === 0) { + this.emit('finish'); + } else { + setTimeout(this._sendBatchLoop.bind(this), INTERVAL); } - // Repeat until finished - setTimeout(this._finishBatches.bind(this), 200); }; -module.exports = DynamoRestore; \ No newline at end of file +module.exports = DynamoRestore; diff --git a/package.json b/package.json index bc46c90..7e7a365 100644 --- a/package.json +++ b/package.json @@ -1,14 +1,15 @@ { "name": "dynamo-backup-to-s3", - "version": "0.6.1", + "version": "0.7.0", "author": "Dylan Lingelbach (https://github.com/dylanlingelbach)", "license": "MIT", "contributors": [ - "Steven de Salas (https://github.com/sdesalas)" + "Steven de Salas (https://github.com/sdesalas)", + "Matt Geskey (https://github.com/vitrvvivs)" ], "repository": { "type": "git", - "url": "https://github.com/markitx/dynamo-backup-to-s3.git" + "url": "https://github.com/celtra/dynamo-backup-to-s3.git" }, "keywords": [ "dynamo", @@ -19,7 +20,8 @@ "stream" ], "bin": { - "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3" + "dynamo-backup-to-s3": "./bin/dynamo-backup-to-s3", + "dynamo-restore-from-s3": "./bin/dynamo-restore-from-s3" }, "dependencies": { "async": "^1.5.0", @@ -34,7 +36,7 @@ "node": ">=0.10.0" }, "bugs": { - "url": "https://github.com/markitx/dynamo-backup-to-s3/issues" + "url": "https://github.com/celtra/dynamo-backup-to-s3/issues" }, "devDependencies": { "mocha": "^2.3.4"