Skip to content

Commit 8dd5a04

Browse files
committed
[EXT] Refactored and improved the "Reindex" extension
This change improves the patch cbfa9d4, which added a Reindex extension to the extension library. * The code of the `Reindex` class has been changed to separate the initialization and the actual executing of the reindex action * The arguments to the class have been changed to be aligned with the "Reindex" API in the core Elasticsearch client (see 1cf5fb9) * The unit tests have been restructured and amended * Dedicated integration tests have been added to verify the behaviour against a real Elasticsearch cluster Example: reindex = Elasticsearch::Extensions::Reindex.new \ source: { index: 'test1', client: source_client }, target: { index: 'test2', client: target_client }, transform: lambda { |doc| doc['_source']['category'].upcase! }, batch_size: 100, refresh: true Related: cbfa9d4, 1cf5fb9 Closes #270
1 parent cbfa9d4 commit 8dd5a04

File tree

5 files changed

+341
-84
lines changed

5 files changed

+341
-84
lines changed

elasticsearch-extensions/lib/elasticsearch/extensions.rb

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
# encoding: utf-8
2+
13
require 'elasticsearch'
24
require 'elasticsearch/extensions/version'
3-
require 'elasticsearch/extensions/reindex'
45

56
module Elasticsearch
67
module Extensions

elasticsearch-extensions/lib/elasticsearch/extensions/reindex.rb

Lines changed: 148 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,158 @@
1+
# encoding: utf-8
2+
13
module Elasticsearch
24
module Extensions
3-
# Reindex using the scroll api. This moves data (not mappings) from one index
4-
# to another. The target index can be on a different cluster.
5-
#
6-
# This is useful when updating mappings on existing fields in an index (eg with
7-
# new analyzers).
8-
#
9-
# @example Reindex all documents under a new index name
5+
6+
# This module allows copying documents from one index/cluster to another one
107
#
11-
# Elasticsearch::Extensions::Reindex.new client: client, src_index: 'foo', target_index: 'bar'
8+
# When required together with the client, it will add the `reindex` method
129
#
13-
# @see https://www.elastic.co/guide/en/elasticsearch/guide/current/reindex.html
10+
# @see Reindex::Reindex.initialize
11+
# @see Reindex::Reindex#perform
1412
#
15-
# @option arguments [Client] :client (*Required*)
16-
# @option arguments [String] :src_index (*Required*)
17-
# @option arguments [String] :target_index (*Required*)
18-
# @option arguments [Client] :target_client
19-
# @option arguments [Int] :chunk_size
20-
# @option arguments [String] :period period to ask es to keep scroll buffer open '5m'
13+
# @see http://www.rubydoc.info/gems/elasticsearch-api/Elasticsearch/API/Actions#reindex-instance_method
2114
#
22-
class Reindex
23-
def initialize(opts = {})
24-
raise ArgumentError, "Required argument 'client' missing" unless opts[:client]
25-
raise ArgumentError, "Required argument 'src_index' missing" unless opts[:src_index]
26-
raise ArgumentError, "Required argument 'target_index' missing" unless opts[:target_index]
27-
28-
valid_params = [
29-
:client,
30-
:src_index,
31-
:target_index,
32-
:target_client,
33-
:chunk_size,
34-
:period
35-
]
36-
37-
default_params = {
38-
chunk_size: 500,
39-
period: '5m'
40-
}
41-
42-
opts.each { |k, v| raise ArgumentError unless valid_params.include?(k) }
43-
params = default_params.merge(opts)
44-
client = params[:client]
45-
target_client = params[:target_client] || client
46-
47-
r = client.search(index: params[:src_index],
48-
search_type: 'scan',
49-
scroll: params[:period],
50-
size: params[:chunk_size])
51-
52-
while r = client.scroll(scroll_id: r['_scroll_id'], scroll: params[:period]) do
53-
docs = r['hits']['hits']
54-
break if docs.empty?
55-
body = docs.map do |doc|
56-
doc['_index'] = params[:target_index]
57-
doc['data'] = doc['_source']
58-
doc.delete('_score')
59-
doc.delete('_source')
60-
{ index: doc }
15+
module Reindex
16+
17+
# Initialize a new instance of the Reindex class (shortcut)
18+
#
19+
# @see Reindex::Reindex.initialize
20+
#
21+
def new(arguments={})
22+
Reindex.new(arguments)
23+
end; extend self
24+
25+
module API
26+
# Copy documents from one index into another and refresh the target index
27+
#
28+
# @example
29+
# client.reindex source: { index: 'test1' }, target: { index: 'test2' }, refresh: true
30+
#
31+
# The method allows all the options as {Reindex::Reindex.new}.
32+
#
33+
# This method will be mixed into the Elasticsearch client's API, if available.
34+
#
35+
def reindex(arguments={})
36+
arguments[:source] ||= {}
37+
arguments[:source][:client] = self
38+
Reindex.new(arguments).perform
39+
end
40+
end
41+
42+
# Include the `reindex` method in the API and client, if available
43+
Elasticsearch::API::Actions.__send__ :include, API if defined?(Elasticsearch::API::Actions)
44+
Elasticsearch::Transport::Client.__send__ :include, API if defined?(Elasticsearch::Transport::Client) && defined?(Elasticsearch::API)
45+
46+
# Copy documents from one index into another
47+
#
48+
# @example Copy documents to another index
49+
#
50+
# client = Elasticsearch::Client.new
51+
# reindex = Elasticsearch::Extensions::Reindex.new \
52+
# source: { index: 'test1', client: client },
53+
# target: { index: 'test2' }
54+
#
55+
# reindex.perform
56+
#
57+
# @example Copy documents to a different cluster
58+
#
59+
# source_client = Elasticsearch::Client.new url: 'http://localhost:9200'
60+
# target_client = Elasticsearch::Client.new url: 'http://localhost:9250'
61+
#
62+
# reindex = Elasticsearch::Extensions::Reindex.new \
63+
# source: { index: 'test', client: source_client },
64+
# target: { index: 'test', client: target_client }
65+
# reindex.perform
66+
#
67+
# @example Transform the documents during re-indexing
68+
#
69+
# reindex = Elasticsearch::Extensions::Reindex.new \
70+
# source: { index: 'test1', client: client },
71+
# target: { index: 'test2' },
72+
# transform: lambda { |doc| doc['_source']['category'].upcase! }
73+
#
74+
# The reindexing process works by "scrolling" an index and sending
75+
# batches via the "Bulk" API to the target index/cluster
76+
#
77+
# @option arguments [String] :source The source index/cluster definition (*Required*)
78+
# @option arguments [String] :target The target index/cluster definition (*Required*)
79+
# @option arguments [Integer] :batch_size The size of the batch for scroll operation (Default: 1000)
80+
# @option arguments [String] :scroll The timeout for the scroll operation (Default: 5min)
81+
# @option arguments [Boolean] :refresh Whether to refresh the target index after
82+
# the operation is completed (Default: false)
83+
# @option arguments [Proc] :transform A block which will be executed for each document
84+
#
85+
# Be aware, that if you want to change the target index settings and/or mappings,
86+
# you have to do so in advance by using the "Indices Create" API.
87+
#
88+
# Note, that there is a native "Reindex" API in Elasticsearch 2.3.x and higer versions,
89+
# which will be more performant than the Ruby version.
90+
#
91+
# @see http://www.rubydoc.info/gems/elasticsearch-api/Elasticsearch/API/Actions#reindex-instance_method
92+
#
93+
class Reindex
94+
attr_reader :arguments
95+
96+
def initialize(arguments={})
97+
[
98+
[:source, :index],
99+
[:source, :client],
100+
[:target, :index]
101+
].each do |required_option|
102+
value = required_option.reduce(arguments) { |sum, o| sum = sum[o] ? sum[o] : {} }
103+
104+
raise ArgumentError,
105+
"Required argument '#{Hash[*required_option]}' missing" if \
106+
value.respond_to?(:empty?) ? value.empty? : value.nil?
107+
end
108+
109+
@arguments = {
110+
batch_size: 1000,
111+
scroll: '5m',
112+
refresh: false
113+
}.merge(arguments)
114+
115+
arguments[:target][:client] ||= arguments[:source][:client]
116+
end
117+
118+
# Performs the operation
119+
#
120+
# @return [Hash] A Hash with the information about the operation outcome
121+
#
122+
def perform
123+
output = { errors: 0 }
124+
125+
response = arguments[:source][:client].search(
126+
index: arguments[:source][:index],
127+
scroll: arguments[:scroll],
128+
size: arguments[:batch_size],
129+
search_type: 'scan',
130+
fields: ['_source', '_parent', '_routing', '_timestamp']
131+
)
132+
133+
while response = arguments[:source][:client].scroll(scroll_id: response['_scroll_id'], scroll: arguments[:scroll]) do
134+
documents = response['hits']['hits']
135+
break if documents.empty?
136+
137+
bulk = documents.map do |doc|
138+
doc['_index'] = arguments[:target][:index]
139+
140+
arguments[:target][:transform].call(doc) if arguments[:target][:transform]
141+
142+
doc['data'] = doc['_source']
143+
doc.delete('_score')
144+
doc.delete('_source')
145+
146+
{ index: doc }
147+
end
148+
149+
bulk_response = arguments[:target][:client].bulk body: bulk
150+
output[:errors] += bulk_response['items'].select { |k, v| k.values.first['error'] }.size
61151
end
62-
target_client.bulk body: body
152+
153+
arguments[:target][:client].indices.refresh index: arguments[:target][:index] if arguments[:refresh]
154+
155+
output
63156
end
64157
end
65158
end
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
require 'test_helper'
2+
require 'elasticsearch/extensions/reindex'
3+
4+
class Elasticsearch::Extensions::ReindexIntegrationTest < Elasticsearch::Test::IntegrationTestCase
5+
context "The Reindex extension" do
6+
setup do
7+
@port = (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
8+
9+
@logger = ::Logger.new(STDERR)
10+
@logger.formatter = proc do |severity, datetime, progname, msg|
11+
color = case severity
12+
when /INFO/ then :green
13+
when /ERROR|WARN|FATAL/ then :red
14+
when /DEBUG/ then :cyan
15+
else :white
16+
end
17+
ANSI.ansi(severity[0] + ' ', color, :faint) + ANSI.ansi(msg, :white, :faint) + "\n"
18+
end
19+
20+
@client = Elasticsearch::Client.new host: "localhost:#{@port}", logger: @logger
21+
@client.indices.delete index: '_all'
22+
23+
@client.index index: 'test1', type: 'd', id: 1, body: { title: 'TEST 1', category: 'one' }
24+
@client.index index: 'test1', type: 'd', id: 2, body: { title: 'TEST 2', category: 'two' }
25+
@client.index index: 'test1', type: 'd', id: 3, body: { title: 'TEST 3', category: 'three' }
26+
@client.indices.refresh index: 'test1'
27+
28+
@client.cluster.health wait_for_status: 'yellow'
29+
end
30+
31+
should "copy documents from one index to another" do
32+
reindex = Elasticsearch::Extensions::Reindex.new \
33+
source: { index: 'test1', client: @client },
34+
target: { index: 'test2' },
35+
refresh: true
36+
37+
result = reindex.perform
38+
39+
assert_equal 0, result[:errors]
40+
assert_equal 3, @client.search(index: 'test2')['hits']['total']
41+
end
42+
43+
should "transform documents with a lambda" do
44+
reindex = Elasticsearch::Extensions::Reindex.new \
45+
source: { index: 'test1', client: @client },
46+
target: { index: 'test2', transform: lambda { |d| d['_source']['category'].upcase! } },
47+
refresh: true
48+
49+
result = reindex.perform
50+
51+
assert_equal 0, result[:errors]
52+
assert_equal 3, @client.search(index: 'test2')['hits']['total']
53+
assert_equal 'ONE', @client.get(index: 'test2', type: 'd', id: 1)['_source']['category']
54+
end
55+
56+
should "return the number of errors" do
57+
@client.indices.create index: 'test3', body: { mappings: { d: { properties: { category: { type: 'integer' } }}}}
58+
@client.cluster.health wait_for_status: 'yellow'
59+
60+
reindex = Elasticsearch::Extensions::Reindex.new \
61+
source: { index: 'test1', client: @client },
62+
target: { index: 'test3', transform: lambda { |d| d['_source']['category'].upcase!; d } },
63+
refresh: true
64+
65+
result = reindex.perform
66+
67+
assert_equal 3, result[:errors]
68+
assert_equal 0, @client.search(index: 'test3')['hits']['total']
69+
end
70+
71+
should "reindex via the API integration" do
72+
@client.reindex source: { index: 'test1' }, target: { index: 'test4' }
73+
74+
@client.indices.refresh index: 'test4'
75+
76+
assert_equal 3, @client.search(index: 'test4')['hits']['total']
77+
end
78+
end
79+
80+
end

0 commit comments

Comments
 (0)