From 0b773f2ddc0401ac9a301de7e97688b79be39043 Mon Sep 17 00:00:00 2001 From: Anil Date: Tue, 19 Jan 2016 13:29:38 +0100 Subject: [PATCH] add reindex extension using scroll api --- .../lib/elasticsearch/extensions.rb | 1 + .../lib/elasticsearch/extensions/reindex.rb | 67 +++++++++++++++++++ .../test/reindex/unit/reindex_test.rb | 34 ++++++++++ 3 files changed, 102 insertions(+) create mode 100644 elasticsearch-extensions/lib/elasticsearch/extensions/reindex.rb create mode 100644 elasticsearch-extensions/test/reindex/unit/reindex_test.rb diff --git a/elasticsearch-extensions/lib/elasticsearch/extensions.rb b/elasticsearch-extensions/lib/elasticsearch/extensions.rb index c03e84514f..d41a69ee7a 100644 --- a/elasticsearch-extensions/lib/elasticsearch/extensions.rb +++ b/elasticsearch-extensions/lib/elasticsearch/extensions.rb @@ -1,5 +1,6 @@ require 'elasticsearch' require 'elasticsearch/extensions/version' +require 'elasticsearch/extensions/reindex' module Elasticsearch module Extensions diff --git a/elasticsearch-extensions/lib/elasticsearch/extensions/reindex.rb b/elasticsearch-extensions/lib/elasticsearch/extensions/reindex.rb new file mode 100644 index 0000000000..621b33ad86 --- /dev/null +++ b/elasticsearch-extensions/lib/elasticsearch/extensions/reindex.rb @@ -0,0 +1,67 @@ +module Elasticsearch + module Extensions + # Reindex using the scroll api. This moves data (not mappings) from one index + # to another. The target index can be on a different cluster. + # + # This is useful when updating mappings on existing fields in an index (eg with + # new analyzers). + # + # @example Reindex all documents under a new index name + # + # Elasticsearch::Extensions::Reindex.new client: client, src_index: 'foo', target_index: 'bar' + # + # @see https://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html + # + # @option arguments [Client] :client (*Required*) + # @option arguments [String] :src_index (*Required*) + # @option arguments [String] :target_index (*Required*) + # @option arguments [Client] :target_client + # @option arguments [Int] :chunk_size + # @option arguments [String] :period period to ask es to keep scroll buffer open '5m' + # + class Reindex + def initialize(opts = {}) + raise ArgumentError, "Required argument 'client' missing" unless opts[:client] + raise ArgumentError, "Required argument 'src_index' missing" unless opts[:src_index] + raise ArgumentError, "Required argument 'target_index' missing" unless opts[:target_index] + + valid_params = [ + :client, + :src_index, + :target_index, + :target_client, + :chunk_size, + :period + ] + + default_params = { + chunk_size: 500, + period: '5m' + } + + opts.each { |k, v| raise ArgumentError unless valid_params.include?(k) } + params = default_params.merge(opts) + client = params[:client] + target_client = params[:target_client] || client + + r = client.search(index: params[:src_index], + search_type: 'scan', + scroll: params[:period], + size: params[:chunk_size]) + + while r = client.scroll(scroll_id: r['_scroll_id'], scroll: params[:period]) do + docs = r['hits']['hits'] + break if docs.empty? + body = docs.map do |doc| + doc['_index'] = params[:target_index] + doc['data'] = doc['_source'] + doc.delete('_score') + doc.delete('_source') + { index: doc } + end + target_client.bulk body: body + end + end + end + end +end diff --git a/elasticsearch-extensions/test/reindex/unit/reindex_test.rb b/elasticsearch-extensions/test/reindex/unit/reindex_test.rb new file mode 100644 index 0000000000..75efc2496b --- /dev/null +++ b/elasticsearch-extensions/test/reindex/unit/reindex_test.rb @@ -0,0 +1,34 @@ +require 'elasticsearch' +require 'test_helper' + +class Elasticsearch::Extensions::ReindexTest < Test::Unit::TestCase + context "reindex" do + should "scroll and bulk insert" do + @subject = Elasticsearch::Client.new + search_opts = { index: 'foo-index', + search_type: 'scan', + scroll: '5m', + size: 500 } + scroll_opts = { scroll_id: 'bar-id', + scroll: '5m' } + doc = { '_id' => 'quux', + '_type' => 'foo-type', + '_source' => { 'field1' => 'foobar' } } + scroll_rsp = { 'hits' => { 'hits' => [doc] } } + empty_scroll_rsp = { 'hits' => { 'hits' => [] } } + bulk_body = [{ index: { '_index' => 'bar-index', + '_type' => doc['_type'], + '_id' => doc['_id'], + 'data' => doc['_source'] } }] + + @subject.expects(:search).with(search_opts).returns({ '_scroll_id' => 'bar-id' }) + @subject.expects(:scroll).with(scroll_opts).returns(scroll_rsp) + @subject.expects(:scroll).with({ scroll_id: nil, scroll: '5m' }).returns(empty_scroll_rsp) + @subject.expects(:bulk).with(body: bulk_body).returns([]) + + Elasticsearch::Extensions::Reindex.new(client: @subject, + src_index: 'foo-index', + target_index: 'bar-index') + end + end +end