Skip to content

Commit d9dd45e

Browse files
mashhursjsvd
andauthored
Tolerate the elasticsearch-ruby v8 client in integration tests. (#1208)
* elasticsearch-ruby client got updated to v8 in LS core. This plugin uses it in integration tests. This change tolerates both elasticsearch-ruby v7 and v8 client versions. * Fix the ILM spec issue where method was removed, restored internally. --------- Co-authored-by: João Duarte <[email protected]>
1 parent 3ef3c0c commit d9dd45e

File tree

11 files changed

+108
-117
lines changed

11 files changed

+108
-117
lines changed

Diff for: spec/es_spec_helper.rb

+34-22
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
require_relative './spec_helper'
22

33
require 'elasticsearch'
4-
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
5-
require_relative "support/elasticsearch/api/actions/get_alias"
6-
require_relative "support/elasticsearch/api/actions/put_alias"
7-
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
8-
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
94

105
require 'json'
116
require 'cabin'
127

8+
# remove this condition and support package once plugin starts consuming elasticsearch-ruby v8 client
9+
# in elasticsearch-ruby v7, ILM APIs were in a separate xpack gem, now directly available
10+
unless elastic_ruby_v8_client_available?
11+
require_relative "support/elasticsearch/api/actions/delete_ilm_policy"
12+
require_relative "support/elasticsearch/api/actions/get_ilm_policy"
13+
require_relative "support/elasticsearch/api/actions/put_ilm_policy"
14+
end
15+
1316
module ESHelper
1417
def get_host_port
1518
if ENV["INTEGRATION"] == "true"
@@ -20,8 +23,12 @@ def get_host_port
2023
end
2124

2225
def get_client
23-
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
24-
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
26+
if elastic_ruby_v8_client_available?
27+
Elasticsearch::Client.new(:hosts => [get_host_port])
28+
else
29+
Elasticsearch::Client.new(:hosts => [get_host_port]).tap do |client|
30+
allow(client).to receive(:verify_elasticsearch).and_return(true) # bypass client side version checking
31+
end
2532
end
2633
end
2734

@@ -126,31 +133,36 @@ def get_cluster_settings(client)
126133
end
127134

128135
def get_policy(client, policy_name)
129-
client.get_ilm_policy(name: policy_name)
136+
if elastic_ruby_v8_client_available?
137+
client.index_lifecycle_management.get_lifecycle(policy: policy_name)
138+
else
139+
client.get_ilm_policy(name: policy_name)
140+
end
130141
end
131142

132143
def put_policy(client, policy_name, policy)
133-
client.put_ilm_policy({:name => policy_name, :body=> policy})
134-
end
135-
136-
def put_alias(client, the_alias, index)
137-
body = {
138-
"aliases" => {
139-
index => {
140-
"is_write_index"=> true
141-
}
142-
}
143-
}
144-
client.put_alias({name: the_alias, body: body})
144+
if elastic_ruby_v8_client_available?
145+
client.index_lifecycle_management.put_lifecycle({:policy => policy_name, :body=> policy})
146+
else
147+
client.put_ilm_policy({:name => policy_name, :body=> policy})
148+
end
145149
end
146150

147151
def clean_ilm(client)
148-
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
152+
if elastic_ruby_v8_client_available?
153+
client.index_lifecycle_management.get_lifecycle.each_key { |key| client.index_lifecycle_management.delete_lifecycle(policy: key) if key =~ /logstash-policy/ }
154+
else
155+
client.get_ilm_policy.each_key { |key| client.delete_ilm_policy(name: key) if key =~ /logstash-policy/ }
156+
end
149157
end
150158

151159
def supports_ilm?(client)
152160
begin
153-
client.get_ilm_policy
161+
if elastic_ruby_v8_client_available?
162+
client.index_lifecycle_management.get_lifecycle
163+
else
164+
client.get_ilm_policy
165+
end
154166
true
155167
rescue
156168
false

Diff for: spec/integration/outputs/delete_spec.rb

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,25 @@
3939
it "should ignore non-monotonic external version updates" do
4040
id = "ev2"
4141
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
42-
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
42+
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
4343
expect(r['_version']).to eq(99)
4444
expect(r['_source']['message']).to eq('foo')
4545

4646
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
47-
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
47+
r2 = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
4848
expect(r2['_version']).to eq(99)
4949
expect(r2['_source']['message']).to eq('foo')
5050
end
5151

5252
it "should commit monotonic external version updates" do
5353
id = "ev3"
5454
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
55-
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
55+
r = es.get(:index => 'logstash-delete', :id => id, :refresh => true)
5656
expect(r['_version']).to eq(99)
5757
expect(r['_source']['message']).to eq('foo')
5858

5959
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
60-
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
60+
expect { es.get(:index => 'logstash-delete', :id => id, :refresh => true) }.to raise_error(get_expected_error_class)
6161
end
6262
end
6363
end

Diff for: spec/integration/outputs/ilm_spec.rb

+18-12
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
it 'should not install the default policy' do
103103
subject.register
104104
sleep(1)
105-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
105+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
106106
end
107107

108108
it 'should not write the ILM settings into the template' do
@@ -282,12 +282,12 @@
282282
subject.register
283283
sleep(1)
284284
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
285-
expect(@es.get_alias(name: "logstash")).to include("logstash-000001")
285+
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-000001")
286286
end
287287
end
288288

289289
it 'should install it if it is not present' do
290-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
290+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
291291
subject.register
292292
sleep(1)
293293
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.not_to raise_error
@@ -298,7 +298,7 @@
298298
subject.register
299299
sleep(1)
300300
expect(@es.indices.exists_alias(name: "logstash")).to be_truthy
301-
expect(@es.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
301+
expect(@es.indices.get_alias(name: "logstash")).to include("logstash-#{todays_date}-000001")
302302
end
303303

304304
it 'should ingest into a single index' do
@@ -340,14 +340,14 @@
340340
let (:policy) { small_max_doc_policy }
341341

342342
before do
343-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
343+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
344344
put_policy(@es,ilm_policy_name, policy)
345345
end
346346

347347
it 'should not install the default policy if it is not used' do
348348
subject.register
349349
sleep(1)
350-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
350+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
351351
end
352352
end
353353

@@ -357,14 +357,14 @@
357357
let (:policy) { max_age_policy("1d") }
358358

359359
before do
360-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
360+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
361361
put_policy(@es,ilm_policy_name, policy)
362362
end
363363

364364
it 'should not install the default policy if it is not used' do
365365
subject.register
366366
sleep(1)
367-
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
367+
expect{get_policy(@es, LogStash::Outputs::ElasticSearch::DEFAULT_POLICY)}.to raise_error(get_expected_error_class)
368368
end
369369
end
370370

@@ -374,7 +374,7 @@
374374
subject.register
375375
sleep(1)
376376
expect(@es.indices.exists_alias(name: expected_index)).to be_truthy
377-
expect(@es.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
377+
expect(@es.indices.get_alias(name: expected_index)).to include("#{expected_index}-#{todays_date}-000001")
378378
end
379379

380380
it 'should write the ILM settings into the template' do
@@ -443,17 +443,18 @@
443443
subject.register
444444
sleep(1)
445445
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
446-
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
446+
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
447447
end
448448

449449
context 'when the custom rollover alias already exists' do
450450
it 'should ignore the already exists error' do
451451
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_falsey
452-
put_alias(@es, "#{ilm_rollover_alias}-#{todays_date}-000001", ilm_rollover_alias)
452+
@es.indices.create(index: "#{ilm_rollover_alias}-#{todays_date}-000001")
453+
@es.indices.put_alias(name: ilm_rollover_alias, index: "#{ilm_rollover_alias}-#{todays_date}-000001")
453454
expect(@es.indices.exists_alias(name: ilm_rollover_alias)).to be_truthy
454455
subject.register
455456
sleep(1)
456-
expect(@es.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
457+
expect(@es.indices.get_alias(name: ilm_rollover_alias)).to include("#{ilm_rollover_alias}-#{todays_date}-000001")
457458
end
458459

459460
end
@@ -532,3 +533,8 @@
532533
end
533534

534535
end
536+
537+
def get_expected_error_class
538+
return Elastic::Transport::Transport::Errors::NotFound if elastic_ruby_v8_client_available?
539+
Elasticsearch::Transport::Transport::Errors::NotFound
540+
end

Diff for: spec/integration/outputs/index_version_spec.rb

+7-7
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@
3636

3737
it "should default to ES version" do
3838
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
39-
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
39+
r = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
4040
expect(r["_version"]).to eq(1)
4141
expect(r["_source"]["message"]).to eq('foo')
4242
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
43-
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
43+
r2 = es.get(:index => 'logstash-index', :id => '123', :refresh => true)
4444
expect(r2["_version"]).to eq(2)
4545
expect(r2["_source"]["message"]).to eq('foobar')
4646
end
@@ -63,33 +63,33 @@
6363
it "should respect the external version" do
6464
id = "ev1"
6565
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
66-
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
66+
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
6767
expect(r["_version"]).to eq(99)
6868
expect(r["_source"]["message"]).to eq('foo')
6969
end
7070

7171
it "should ignore non-monotonic external version updates" do
7272
id = "ev2"
7373
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
74-
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
74+
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
7575
expect(r["_version"]).to eq(99)
7676
expect(r["_source"]["message"]).to eq('foo')
7777

7878
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
79-
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
79+
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
8080
expect(r2["_version"]).to eq(99)
8181
expect(r2["_source"]["message"]).to eq('foo')
8282
end
8383

8484
it "should commit monotonic external version updates" do
8585
id = "ev3"
8686
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
87-
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
87+
r = es.get(:index => 'logstash-index', :id => id, :refresh => true)
8888
expect(r["_version"]).to eq(99)
8989
expect(r["_source"]["message"]).to eq('foo')
9090

9191
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
92-
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
92+
r2 = es.get(:index => 'logstash-index', :id => id, :refresh => true)
9393
expect(r2["_version"]).to eq(100)
9494
expect(r2["_source"]["message"]).to eq('foo')
9595
end

Diff for: spec/integration/outputs/painless_update_spec.rb

+11-10
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ def get_es_output( options={} )
2222
# This can fail if there are no indexes, ignore failure.
2323
@es.indices.delete(:index => "*") rescue nil
2424
@es.index(
25-
:index => 'logstash-update',
26-
:type => doc_type,
27-
:id => "123",
28-
:body => { :message => 'Test', :counter => 1 }
29-
)
25+
{
26+
:index => 'logstash-update',
27+
:id => '123',
28+
:body => { :message => 'Test', :counter => 1 },
29+
:refresh => true
30+
})
3031
@es.indices.refresh
3132
end
3233

@@ -46,7 +47,7 @@ def get_es_output( options={} )
4647
subject = get_es_output(plugin_parameters)
4748
subject.register
4849
subject.multi_receive([LogStash::Event.new("count" => 4 )])
49-
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
50+
r = @es.get(:index => 'logstash-update', :id => "123", :refresh => true)
5051
expect(r["_source"]["counter"]).to eq(5)
5152
end
5253
end
@@ -57,15 +58,15 @@ def get_es_output( options={} )
5758
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
5859
subject.register
5960
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
60-
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
61+
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
6162
expect(r["_source"]["message"]).to eq('upsert message')
6263
end
6364

6465
it "should create new documents with event/doc as upsert" do
6566
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
6667
subject.register
6768
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
68-
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
69+
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
6970
expect(r["_source"]["message"]).to eq('sample message here')
7071
end
7172

@@ -82,7 +83,7 @@ def get_es_output( options={} )
8283
subject.register
8384

8485
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
85-
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
86+
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
8687
expect(r["_source"]["message"]).to eq('upsert message')
8788
end
8889

@@ -91,7 +92,7 @@ def get_es_output( options={} )
9192
subject.register
9293
subject.multi_receive([LogStash::Event.new("counter" => 1)])
9394
@es.indices.refresh
94-
r = @es.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
95+
r = @es.get(:index => 'logstash-update', :id => "456", :refresh => true)
9596
expect(r["_source"]["counter"]).to eq(1)
9697
end
9798
end

Diff for: spec/integration/outputs/unsupported_actions_spec.rb

+15-10
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,21 @@ def get_es_output( options={} )
2727
@es.indices.delete(:index => "*") rescue nil
2828
# index single doc for update purpose
2929
@es.index(
30-
:index => INDEX,
31-
:type => doc_type,
32-
:id => "2",
33-
:body => { :message => 'Test to doc indexing', :counter => 1 }
30+
{
31+
:index => INDEX,
32+
:id => '2',
33+
:body => { :message => 'Test to doc indexing', :counter => 1 },
34+
:refresh => true
35+
}
3436
)
37+
3538
@es.index(
36-
:index => INDEX,
37-
:type => doc_type,
38-
:id => "3",
39-
:body => { :message => 'Test to doc deletion', :counter => 2 }
39+
{
40+
:index => INDEX,
41+
:id => '3',
42+
:body => { :message => 'Test to doc deletion', :counter => 2 },
43+
:refresh => true
44+
}
4045
)
4146
@es.indices.refresh
4247
end
@@ -63,12 +68,12 @@ def get_es_output( options={} )
6368
rejected_events = events.select { |event| !index_or_update.call(event) }
6469

6570
indexed_events.each do |event|
66-
response = @es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)
71+
response = @es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true)
6772
expect(response['_source']['message']).to eq(event.get("message"))
6873
end
6974

7075
rejected_events.each do |event|
71-
expect {@es.get(:index => INDEX, :type => doc_type, :id => event.get("doc_id"), :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
76+
expect {@es.get(:index => INDEX, :id => event.get("doc_id"), :refresh => true)}.to raise_error(get_expected_error_class)
7277
end
7378
end
7479
end

0 commit comments

Comments
 (0)