|
| 1 | +module Elasticsearch |
| 2 | + module Extensions |
| 3 | + # Reindex using the scroll api. This moves data (not mappings) from one index |
| 4 | + # to another. The target index can be on a different cluster. |
| 5 | + # |
| 6 | + # This is useful when updating mappings on existing fields in an index (eg with |
| 7 | + # new analyzers). |
| 8 | + # |
| 9 | + # @example Reindex all documents under a new index name |
| 10 | + # |
| 11 | + # Elasticsearch::Extensions::Reindex.new client: client, src_index: 'foo', target_index: 'bar' |
| 12 | + # |
| 13 | + # @see https://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html |
| 14 | + # |
| 15 | + # @option arguments [Client] :client (*Required*) |
| 16 | + # @option arguments [String] :src_index (*Required*) |
| 17 | + # @option arguments [String] :target_index (*Required*) |
| 18 | + # @option arguments [Client] :target_client |
| 19 | + # @option arguments [Int] :chunk_size |
| 20 | + # @option arguments [String] :period period to ask es to keep scroll buffer open '5m' |
| 21 | + # |
| 22 | + class Reindex |
| 23 | + def initialize(opts = {}) |
| 24 | + raise ArgumentError, "Required argument 'client' missing" unless opts[:client] |
| 25 | + raise ArgumentError, "Required argument 'src_index' missing" unless opts[:src_index] |
| 26 | + raise ArgumentError, "Required argument 'target_index' missing" unless opts[:target_index] |
| 27 | + |
| 28 | + valid_params = [ |
| 29 | + :client, |
| 30 | + :src_index, |
| 31 | + :target_index, |
| 32 | + :target_client, |
| 33 | + :chunk_size, |
| 34 | + :period |
| 35 | + ] |
| 36 | + |
| 37 | + default_params = { |
| 38 | + chunk_size: 500, |
| 39 | + period: '5m' |
| 40 | + } |
| 41 | + |
| 42 | + opts.each { |k, v| raise ArgumentError unless valid_params.include?(k) } |
| 43 | + params = default_params.merge(opts) |
| 44 | + client = params[:client] |
| 45 | + target_client = params[:target_client] || client |
| 46 | + |
| 47 | + r = client.search(index: params[:src_index], |
| 48 | + search_type: 'scan', |
| 49 | + scroll: params[:period], |
| 50 | + size: params[:chunk_size]) |
| 51 | + |
| 52 | + while r = client.scroll(scroll_id: r['_scroll_id'], scroll: params[:period]) do |
| 53 | + docs = r['hits']['hits'] |
| 54 | + break if docs.empty? |
| 55 | + body = docs.map do |doc| |
| 56 | + doc['_index'] = params[:target_index] |
| 57 | + doc['data'] = doc['_source'] |
| 58 | + doc.delete('_score') |
| 59 | + doc.delete('_source') |
| 60 | + { index: doc } |
| 61 | + end |
| 62 | + target_client.bulk body: body |
| 63 | + end |
| 64 | + end |
| 65 | + end |
| 66 | + end |
| 67 | +end |
0 commit comments